##// END OF EJS Templates
wireproto: use new peer interface...
Gregory Szorc -
r33805:dedab036 default
parent child Browse files
Show More
@@ -1,426 +1,422 b''
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import errno
11 import errno
12 import os
12 import os
13 import socket
13 import socket
14 import struct
14 import struct
15 import tempfile
15 import tempfile
16
16
17 from .i18n import _
17 from .i18n import _
18 from .node import nullid
18 from .node import nullid
19 from . import (
19 from . import (
20 bundle2,
20 bundle2,
21 error,
21 error,
22 httpconnection,
22 httpconnection,
23 pycompat,
23 pycompat,
24 repository,
25 statichttprepo,
24 statichttprepo,
26 url,
25 url,
27 util,
26 util,
28 wireproto,
27 wireproto,
29 )
28 )
30
29
31 httplib = util.httplib
30 httplib = util.httplib
32 urlerr = util.urlerr
31 urlerr = util.urlerr
33 urlreq = util.urlreq
32 urlreq = util.urlreq
34
33
35 def encodevalueinheaders(value, header, limit):
34 def encodevalueinheaders(value, header, limit):
36 """Encode a string value into multiple HTTP headers.
35 """Encode a string value into multiple HTTP headers.
37
36
38 ``value`` will be encoded into 1 or more HTTP headers with the names
37 ``value`` will be encoded into 1 or more HTTP headers with the names
39 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
38 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
40 name + value will be at most ``limit`` bytes long.
39 name + value will be at most ``limit`` bytes long.
41
40
42 Returns an iterable of 2-tuples consisting of header names and values.
41 Returns an iterable of 2-tuples consisting of header names and values.
43 """
42 """
44 fmt = header + '-%s'
43 fmt = header + '-%s'
45 valuelen = limit - len(fmt % '000') - len(': \r\n')
44 valuelen = limit - len(fmt % '000') - len(': \r\n')
46 result = []
45 result = []
47
46
48 n = 0
47 n = 0
49 for i in xrange(0, len(value), valuelen):
48 for i in xrange(0, len(value), valuelen):
50 n += 1
49 n += 1
51 result.append((fmt % str(n), value[i:i + valuelen]))
50 result.append((fmt % str(n), value[i:i + valuelen]))
52
51
53 return result
52 return result
54
53
55 def _wraphttpresponse(resp):
54 def _wraphttpresponse(resp):
56 """Wrap an HTTPResponse with common error handlers.
55 """Wrap an HTTPResponse with common error handlers.
57
56
58 This ensures that any I/O from any consumer raises the appropriate
57 This ensures that any I/O from any consumer raises the appropriate
59 error and messaging.
58 error and messaging.
60 """
59 """
61 origread = resp.read
60 origread = resp.read
62
61
63 class readerproxy(resp.__class__):
62 class readerproxy(resp.__class__):
64 def read(self, size=None):
63 def read(self, size=None):
65 try:
64 try:
66 return origread(size)
65 return origread(size)
67 except httplib.IncompleteRead as e:
66 except httplib.IncompleteRead as e:
68 # e.expected is an integer if length known or None otherwise.
67 # e.expected is an integer if length known or None otherwise.
69 if e.expected:
68 if e.expected:
70 msg = _('HTTP request error (incomplete response; '
69 msg = _('HTTP request error (incomplete response; '
71 'expected %d bytes got %d)') % (e.expected,
70 'expected %d bytes got %d)') % (e.expected,
72 len(e.partial))
71 len(e.partial))
73 else:
72 else:
74 msg = _('HTTP request error (incomplete response)')
73 msg = _('HTTP request error (incomplete response)')
75
74
76 raise error.PeerTransportError(
75 raise error.PeerTransportError(
77 msg,
76 msg,
78 hint=_('this may be an intermittent network failure; '
77 hint=_('this may be an intermittent network failure; '
79 'if the error persists, consider contacting the '
78 'if the error persists, consider contacting the '
80 'network or server operator'))
79 'network or server operator'))
81 except httplib.HTTPException as e:
80 except httplib.HTTPException as e:
82 raise error.PeerTransportError(
81 raise error.PeerTransportError(
83 _('HTTP request error (%s)') % e,
82 _('HTTP request error (%s)') % e,
84 hint=_('this may be an intermittent network failure; '
83 hint=_('this may be an intermittent network failure; '
85 'if the error persists, consider contacting the '
84 'if the error persists, consider contacting the '
86 'network or server operator'))
85 'network or server operator'))
87
86
88 resp.__class__ = readerproxy
87 resp.__class__ = readerproxy
89
88
90 class httppeer(wireproto.wirepeer, repository.legacypeer):
89 class httppeer(wireproto.wirepeer):
91 def __init__(self, ui, path):
90 def __init__(self, ui, path):
92 self._path = path
91 self._path = path
93 self._caps = None
92 self._caps = None
94 self._urlopener = None
93 self._urlopener = None
95 self._requestbuilder = None
94 self._requestbuilder = None
96 u = util.url(path)
95 u = util.url(path)
97 if u.query or u.fragment:
96 if u.query or u.fragment:
98 raise error.Abort(_('unsupported URL component: "%s"') %
97 raise error.Abort(_('unsupported URL component: "%s"') %
99 (u.query or u.fragment))
98 (u.query or u.fragment))
100
99
101 # urllib cannot handle URLs with embedded user or passwd
100 # urllib cannot handle URLs with embedded user or passwd
102 self._url, authinfo = u.authinfo()
101 self._url, authinfo = u.authinfo()
103
102
104 self._ui = ui
103 self._ui = ui
105 ui.debug('using %s\n' % self._url)
104 ui.debug('using %s\n' % self._url)
106
105
107 self._urlopener = url.opener(ui, authinfo)
106 self._urlopener = url.opener(ui, authinfo)
108 self._requestbuilder = urlreq.request
107 self._requestbuilder = urlreq.request
109
108
110 # TODO remove once peerrepository isn't in inheritance.
111 self._capabilities = self.capabilities
112
113 def __del__(self):
109 def __del__(self):
114 urlopener = getattr(self, '_urlopener', None)
110 urlopener = getattr(self, '_urlopener', None)
115 if urlopener:
111 if urlopener:
116 for h in urlopener.handlers:
112 for h in urlopener.handlers:
117 h.close()
113 h.close()
118 getattr(h, "close_all", lambda : None)()
114 getattr(h, "close_all", lambda : None)()
119
115
120 # Begin of _basepeer interface.
116 # Begin of _basepeer interface.
121
117
122 @util.propertycache
118 @util.propertycache
123 def ui(self):
119 def ui(self):
124 return self._ui
120 return self._ui
125
121
126 def url(self):
122 def url(self):
127 return self._path
123 return self._path
128
124
129 def local(self):
125 def local(self):
130 return None
126 return None
131
127
132 def peer(self):
128 def peer(self):
133 return self
129 return self
134
130
135 def canpush(self):
131 def canpush(self):
136 return True
132 return True
137
133
138 def close(self):
134 def close(self):
139 pass
135 pass
140
136
141 # End of _basepeer interface.
137 # End of _basepeer interface.
142
138
143 # Begin of _basewirepeer interface.
139 # Begin of _basewirepeer interface.
144
140
145 def capabilities(self):
141 def capabilities(self):
146 if self._caps is None:
142 if self._caps is None:
147 try:
143 try:
148 self._fetchcaps()
144 self._fetchcaps()
149 except error.RepoError:
145 except error.RepoError:
150 self._caps = set()
146 self._caps = set()
151 self.ui.debug('capabilities: %s\n' %
147 self.ui.debug('capabilities: %s\n' %
152 (' '.join(self._caps or ['none'])))
148 (' '.join(self._caps or ['none'])))
153 return self._caps
149 return self._caps
154
150
155 # End of _basewirepeer interface.
151 # End of _basewirepeer interface.
156
152
157 # look up capabilities only when needed
153 # look up capabilities only when needed
158
154
159 def _fetchcaps(self):
155 def _fetchcaps(self):
160 self._caps = set(self._call('capabilities').split())
156 self._caps = set(self._call('capabilities').split())
161
157
162 def _callstream(self, cmd, _compressible=False, **args):
158 def _callstream(self, cmd, _compressible=False, **args):
163 if cmd == 'pushkey':
159 if cmd == 'pushkey':
164 args['data'] = ''
160 args['data'] = ''
165 data = args.pop('data', None)
161 data = args.pop('data', None)
166 headers = args.pop('headers', {})
162 headers = args.pop('headers', {})
167
163
168 self.ui.debug("sending %s command\n" % cmd)
164 self.ui.debug("sending %s command\n" % cmd)
169 q = [('cmd', cmd)]
165 q = [('cmd', cmd)]
170 headersize = 0
166 headersize = 0
171 varyheaders = []
167 varyheaders = []
172 # Important: don't use self.capable() here or else you end up
168 # Important: don't use self.capable() here or else you end up
173 # with infinite recursion when trying to look up capabilities
169 # with infinite recursion when trying to look up capabilities
174 # for the first time.
170 # for the first time.
175 postargsok = self._caps is not None and 'httppostargs' in self._caps
171 postargsok = self._caps is not None and 'httppostargs' in self._caps
176 # TODO: support for httppostargs when data is a file-like
172 # TODO: support for httppostargs when data is a file-like
177 # object rather than a basestring
173 # object rather than a basestring
178 canmungedata = not data or isinstance(data, basestring)
174 canmungedata = not data or isinstance(data, basestring)
179 if postargsok and canmungedata:
175 if postargsok and canmungedata:
180 strargs = urlreq.urlencode(sorted(args.items()))
176 strargs = urlreq.urlencode(sorted(args.items()))
181 if strargs:
177 if strargs:
182 if not data:
178 if not data:
183 data = strargs
179 data = strargs
184 elif isinstance(data, basestring):
180 elif isinstance(data, basestring):
185 data = strargs + data
181 data = strargs + data
186 headers['X-HgArgs-Post'] = len(strargs)
182 headers['X-HgArgs-Post'] = len(strargs)
187 else:
183 else:
188 if len(args) > 0:
184 if len(args) > 0:
189 httpheader = self.capable('httpheader')
185 httpheader = self.capable('httpheader')
190 if httpheader:
186 if httpheader:
191 headersize = int(httpheader.split(',', 1)[0])
187 headersize = int(httpheader.split(',', 1)[0])
192 if headersize > 0:
188 if headersize > 0:
193 # The headers can typically carry more data than the URL.
189 # The headers can typically carry more data than the URL.
194 encargs = urlreq.urlencode(sorted(args.items()))
190 encargs = urlreq.urlencode(sorted(args.items()))
195 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
191 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
196 headersize):
192 headersize):
197 headers[header] = value
193 headers[header] = value
198 varyheaders.append(header)
194 varyheaders.append(header)
199 else:
195 else:
200 q += sorted(args.items())
196 q += sorted(args.items())
201 qs = '?%s' % urlreq.urlencode(q)
197 qs = '?%s' % urlreq.urlencode(q)
202 cu = "%s%s" % (self._url, qs)
198 cu = "%s%s" % (self._url, qs)
203 size = 0
199 size = 0
204 if util.safehasattr(data, 'length'):
200 if util.safehasattr(data, 'length'):
205 size = data.length
201 size = data.length
206 elif data is not None:
202 elif data is not None:
207 size = len(data)
203 size = len(data)
208 if size and self.ui.configbool('ui', 'usehttp2'):
204 if size and self.ui.configbool('ui', 'usehttp2'):
209 headers['Expect'] = '100-Continue'
205 headers['Expect'] = '100-Continue'
210 headers['X-HgHttp2'] = '1'
206 headers['X-HgHttp2'] = '1'
211 if data is not None and 'Content-Type' not in headers:
207 if data is not None and 'Content-Type' not in headers:
212 headers['Content-Type'] = 'application/mercurial-0.1'
208 headers['Content-Type'] = 'application/mercurial-0.1'
213
209
214 # Tell the server we accept application/mercurial-0.2 and multiple
210 # Tell the server we accept application/mercurial-0.2 and multiple
215 # compression formats if the server is capable of emitting those
211 # compression formats if the server is capable of emitting those
216 # payloads.
212 # payloads.
217 protoparams = []
213 protoparams = []
218
214
219 mediatypes = set()
215 mediatypes = set()
220 if self._caps is not None:
216 if self._caps is not None:
221 mt = self.capable('httpmediatype')
217 mt = self.capable('httpmediatype')
222 if mt:
218 if mt:
223 protoparams.append('0.1')
219 protoparams.append('0.1')
224 mediatypes = set(mt.split(','))
220 mediatypes = set(mt.split(','))
225
221
226 if '0.2tx' in mediatypes:
222 if '0.2tx' in mediatypes:
227 protoparams.append('0.2')
223 protoparams.append('0.2')
228
224
229 if '0.2tx' in mediatypes and self.capable('compression'):
225 if '0.2tx' in mediatypes and self.capable('compression'):
230 # We /could/ compare supported compression formats and prune
226 # We /could/ compare supported compression formats and prune
231 # non-mutually supported or error if nothing is mutually supported.
227 # non-mutually supported or error if nothing is mutually supported.
232 # For now, send the full list to the server and have it error.
228 # For now, send the full list to the server and have it error.
233 comps = [e.wireprotosupport().name for e in
229 comps = [e.wireprotosupport().name for e in
234 util.compengines.supportedwireengines(util.CLIENTROLE)]
230 util.compengines.supportedwireengines(util.CLIENTROLE)]
235 protoparams.append('comp=%s' % ','.join(comps))
231 protoparams.append('comp=%s' % ','.join(comps))
236
232
237 if protoparams:
233 if protoparams:
238 protoheaders = encodevalueinheaders(' '.join(protoparams),
234 protoheaders = encodevalueinheaders(' '.join(protoparams),
239 'X-HgProto',
235 'X-HgProto',
240 headersize or 1024)
236 headersize or 1024)
241 for header, value in protoheaders:
237 for header, value in protoheaders:
242 headers[header] = value
238 headers[header] = value
243 varyheaders.append(header)
239 varyheaders.append(header)
244
240
245 if varyheaders:
241 if varyheaders:
246 headers['Vary'] = ','.join(varyheaders)
242 headers['Vary'] = ','.join(varyheaders)
247
243
248 req = self._requestbuilder(cu, data, headers)
244 req = self._requestbuilder(cu, data, headers)
249
245
250 if data is not None:
246 if data is not None:
251 self.ui.debug("sending %s bytes\n" % size)
247 self.ui.debug("sending %s bytes\n" % size)
252 req.add_unredirected_header('Content-Length', '%d' % size)
248 req.add_unredirected_header('Content-Length', '%d' % size)
253 try:
249 try:
254 resp = self._urlopener.open(req)
250 resp = self._urlopener.open(req)
255 except urlerr.httperror as inst:
251 except urlerr.httperror as inst:
256 if inst.code == 401:
252 if inst.code == 401:
257 raise error.Abort(_('authorization failed'))
253 raise error.Abort(_('authorization failed'))
258 raise
254 raise
259 except httplib.HTTPException as inst:
255 except httplib.HTTPException as inst:
260 self.ui.debug('http error while sending %s command\n' % cmd)
256 self.ui.debug('http error while sending %s command\n' % cmd)
261 self.ui.traceback()
257 self.ui.traceback()
262 raise IOError(None, inst)
258 raise IOError(None, inst)
263
259
264 # Insert error handlers for common I/O failures.
260 # Insert error handlers for common I/O failures.
265 _wraphttpresponse(resp)
261 _wraphttpresponse(resp)
266
262
267 # record the url we got redirected to
263 # record the url we got redirected to
268 resp_url = resp.geturl()
264 resp_url = resp.geturl()
269 if resp_url.endswith(qs):
265 if resp_url.endswith(qs):
270 resp_url = resp_url[:-len(qs)]
266 resp_url = resp_url[:-len(qs)]
271 if self._url.rstrip('/') != resp_url.rstrip('/'):
267 if self._url.rstrip('/') != resp_url.rstrip('/'):
272 if not self.ui.quiet:
268 if not self.ui.quiet:
273 self.ui.warn(_('real URL is %s\n') % resp_url)
269 self.ui.warn(_('real URL is %s\n') % resp_url)
274 self._url = resp_url
270 self._url = resp_url
275 try:
271 try:
276 proto = resp.getheader('content-type')
272 proto = resp.getheader('content-type')
277 except AttributeError:
273 except AttributeError:
278 proto = resp.headers.get('content-type', '')
274 proto = resp.headers.get('content-type', '')
279
275
280 safeurl = util.hidepassword(self._url)
276 safeurl = util.hidepassword(self._url)
281 if proto.startswith('application/hg-error'):
277 if proto.startswith('application/hg-error'):
282 raise error.OutOfBandError(resp.read())
278 raise error.OutOfBandError(resp.read())
283 # accept old "text/plain" and "application/hg-changegroup" for now
279 # accept old "text/plain" and "application/hg-changegroup" for now
284 if not (proto.startswith('application/mercurial-') or
280 if not (proto.startswith('application/mercurial-') or
285 (proto.startswith('text/plain')
281 (proto.startswith('text/plain')
286 and not resp.headers.get('content-length')) or
282 and not resp.headers.get('content-length')) or
287 proto.startswith('application/hg-changegroup')):
283 proto.startswith('application/hg-changegroup')):
288 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
284 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
289 raise error.RepoError(
285 raise error.RepoError(
290 _("'%s' does not appear to be an hg repository:\n"
286 _("'%s' does not appear to be an hg repository:\n"
291 "---%%<--- (%s)\n%s\n---%%<---\n")
287 "---%%<--- (%s)\n%s\n---%%<---\n")
292 % (safeurl, proto or 'no content-type', resp.read(1024)))
288 % (safeurl, proto or 'no content-type', resp.read(1024)))
293
289
294 if proto.startswith('application/mercurial-'):
290 if proto.startswith('application/mercurial-'):
295 try:
291 try:
296 version = proto.split('-', 1)[1]
292 version = proto.split('-', 1)[1]
297 version_info = tuple([int(n) for n in version.split('.')])
293 version_info = tuple([int(n) for n in version.split('.')])
298 except ValueError:
294 except ValueError:
299 raise error.RepoError(_("'%s' sent a broken Content-Type "
295 raise error.RepoError(_("'%s' sent a broken Content-Type "
300 "header (%s)") % (safeurl, proto))
296 "header (%s)") % (safeurl, proto))
301
297
302 # TODO consider switching to a decompression reader that uses
298 # TODO consider switching to a decompression reader that uses
303 # generators.
299 # generators.
304 if version_info == (0, 1):
300 if version_info == (0, 1):
305 if _compressible:
301 if _compressible:
306 return util.compengines['zlib'].decompressorreader(resp)
302 return util.compengines['zlib'].decompressorreader(resp)
307 return resp
303 return resp
308 elif version_info == (0, 2):
304 elif version_info == (0, 2):
309 # application/mercurial-0.2 always identifies the compression
305 # application/mercurial-0.2 always identifies the compression
310 # engine in the payload header.
306 # engine in the payload header.
311 elen = struct.unpack('B', resp.read(1))[0]
307 elen = struct.unpack('B', resp.read(1))[0]
312 ename = resp.read(elen)
308 ename = resp.read(elen)
313 engine = util.compengines.forwiretype(ename)
309 engine = util.compengines.forwiretype(ename)
314 return engine.decompressorreader(resp)
310 return engine.decompressorreader(resp)
315 else:
311 else:
316 raise error.RepoError(_("'%s' uses newer protocol %s") %
312 raise error.RepoError(_("'%s' uses newer protocol %s") %
317 (safeurl, version))
313 (safeurl, version))
318
314
319 if _compressible:
315 if _compressible:
320 return util.compengines['zlib'].decompressorreader(resp)
316 return util.compengines['zlib'].decompressorreader(resp)
321
317
322 return resp
318 return resp
323
319
324 def _call(self, cmd, **args):
320 def _call(self, cmd, **args):
325 fp = self._callstream(cmd, **args)
321 fp = self._callstream(cmd, **args)
326 try:
322 try:
327 return fp.read()
323 return fp.read()
328 finally:
324 finally:
329 # if using keepalive, allow connection to be reused
325 # if using keepalive, allow connection to be reused
330 fp.close()
326 fp.close()
331
327
332 def _callpush(self, cmd, cg, **args):
328 def _callpush(self, cmd, cg, **args):
333 # have to stream bundle to a temp file because we do not have
329 # have to stream bundle to a temp file because we do not have
334 # http 1.1 chunked transfer.
330 # http 1.1 chunked transfer.
335
331
336 types = self.capable('unbundle')
332 types = self.capable('unbundle')
337 try:
333 try:
338 types = types.split(',')
334 types = types.split(',')
339 except AttributeError:
335 except AttributeError:
340 # servers older than d1b16a746db6 will send 'unbundle' as a
336 # servers older than d1b16a746db6 will send 'unbundle' as a
341 # boolean capability. They only support headerless/uncompressed
337 # boolean capability. They only support headerless/uncompressed
342 # bundles.
338 # bundles.
343 types = [""]
339 types = [""]
344 for x in types:
340 for x in types:
345 if x in bundle2.bundletypes:
341 if x in bundle2.bundletypes:
346 type = x
342 type = x
347 break
343 break
348
344
349 tempname = bundle2.writebundle(self.ui, cg, None, type)
345 tempname = bundle2.writebundle(self.ui, cg, None, type)
350 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
346 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
351 headers = {'Content-Type': 'application/mercurial-0.1'}
347 headers = {'Content-Type': 'application/mercurial-0.1'}
352
348
353 try:
349 try:
354 r = self._call(cmd, data=fp, headers=headers, **args)
350 r = self._call(cmd, data=fp, headers=headers, **args)
355 vals = r.split('\n', 1)
351 vals = r.split('\n', 1)
356 if len(vals) < 2:
352 if len(vals) < 2:
357 raise error.ResponseError(_("unexpected response:"), r)
353 raise error.ResponseError(_("unexpected response:"), r)
358 return vals
354 return vals
359 except socket.error as err:
355 except socket.error as err:
360 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
356 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
361 raise error.Abort(_('push failed: %s') % err.args[1])
357 raise error.Abort(_('push failed: %s') % err.args[1])
362 raise error.Abort(err.args[1])
358 raise error.Abort(err.args[1])
363 finally:
359 finally:
364 fp.close()
360 fp.close()
365 os.unlink(tempname)
361 os.unlink(tempname)
366
362
367 def _calltwowaystream(self, cmd, fp, **args):
363 def _calltwowaystream(self, cmd, fp, **args):
368 fh = None
364 fh = None
369 fp_ = None
365 fp_ = None
370 filename = None
366 filename = None
371 try:
367 try:
372 # dump bundle to disk
368 # dump bundle to disk
373 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
369 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
374 fh = os.fdopen(fd, pycompat.sysstr("wb"))
370 fh = os.fdopen(fd, pycompat.sysstr("wb"))
375 d = fp.read(4096)
371 d = fp.read(4096)
376 while d:
372 while d:
377 fh.write(d)
373 fh.write(d)
378 d = fp.read(4096)
374 d = fp.read(4096)
379 fh.close()
375 fh.close()
380 # start http push
376 # start http push
381 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
377 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
382 headers = {'Content-Type': 'application/mercurial-0.1'}
378 headers = {'Content-Type': 'application/mercurial-0.1'}
383 return self._callstream(cmd, data=fp_, headers=headers, **args)
379 return self._callstream(cmd, data=fp_, headers=headers, **args)
384 finally:
380 finally:
385 if fp_ is not None:
381 if fp_ is not None:
386 fp_.close()
382 fp_.close()
387 if fh is not None:
383 if fh is not None:
388 fh.close()
384 fh.close()
389 os.unlink(filename)
385 os.unlink(filename)
390
386
391 def _callcompressable(self, cmd, **args):
387 def _callcompressable(self, cmd, **args):
392 return self._callstream(cmd, _compressible=True, **args)
388 return self._callstream(cmd, _compressible=True, **args)
393
389
394 def _abort(self, exception):
390 def _abort(self, exception):
395 raise exception
391 raise exception
396
392
397 class httpspeer(httppeer):
393 class httpspeer(httppeer):
398 def __init__(self, ui, path):
394 def __init__(self, ui, path):
399 if not url.has_https:
395 if not url.has_https:
400 raise error.Abort(_('Python support for SSL and HTTPS '
396 raise error.Abort(_('Python support for SSL and HTTPS '
401 'is not installed'))
397 'is not installed'))
402 httppeer.__init__(self, ui, path)
398 httppeer.__init__(self, ui, path)
403
399
404 def instance(ui, path, create):
400 def instance(ui, path, create):
405 if create:
401 if create:
406 raise error.Abort(_('cannot create new http repository'))
402 raise error.Abort(_('cannot create new http repository'))
407 try:
403 try:
408 if path.startswith('https:'):
404 if path.startswith('https:'):
409 inst = httpspeer(ui, path)
405 inst = httpspeer(ui, path)
410 else:
406 else:
411 inst = httppeer(ui, path)
407 inst = httppeer(ui, path)
412 try:
408 try:
413 # Try to do useful work when checking compatibility.
409 # Try to do useful work when checking compatibility.
414 # Usually saves a roundtrip since we want the caps anyway.
410 # Usually saves a roundtrip since we want the caps anyway.
415 inst._fetchcaps()
411 inst._fetchcaps()
416 except error.RepoError:
412 except error.RepoError:
417 # No luck, try older compatibility check.
413 # No luck, try older compatibility check.
418 inst.between([(nullid, nullid)])
414 inst.between([(nullid, nullid)])
419 return inst
415 return inst
420 except error.RepoError as httpexception:
416 except error.RepoError as httpexception:
421 try:
417 try:
422 r = statichttprepo.instance(ui, "static-" + path, create)
418 r = statichttprepo.instance(ui, "static-" + path, create)
423 ui.note(_('(falling back to static-http)\n'))
419 ui.note(_('(falling back to static-http)\n'))
424 return r
420 return r
425 except error.RepoError:
421 except error.RepoError:
426 raise httpexception # use the original http RepoError instead
422 raise httpexception # use the original http RepoError instead
@@ -1,140 +1,96 b''
1 # peer.py - repository base classes for mercurial
1 # peer.py - repository base classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 from .i18n import _
12 from . import (
11 from . import (
13 error,
12 error,
14 util,
13 util,
15 )
14 )
16
15
17 # abstract batching support
16 # abstract batching support
18
17
19 class future(object):
18 class future(object):
20 '''placeholder for a value to be set later'''
19 '''placeholder for a value to be set later'''
21 def set(self, value):
20 def set(self, value):
22 if util.safehasattr(self, 'value'):
21 if util.safehasattr(self, 'value'):
23 raise error.RepoError("future is already set")
22 raise error.RepoError("future is already set")
24 self.value = value
23 self.value = value
25
24
26 class batcher(object):
25 class batcher(object):
27 '''base class for batches of commands submittable in a single request
26 '''base class for batches of commands submittable in a single request
28
27
29 All methods invoked on instances of this class are simply queued and
28 All methods invoked on instances of this class are simply queued and
30 return a a future for the result. Once you call submit(), all the queued
29 return a a future for the result. Once you call submit(), all the queued
31 calls are performed and the results set in their respective futures.
30 calls are performed and the results set in their respective futures.
32 '''
31 '''
33 def __init__(self):
32 def __init__(self):
34 self.calls = []
33 self.calls = []
35 def __getattr__(self, name):
34 def __getattr__(self, name):
36 def call(*args, **opts):
35 def call(*args, **opts):
37 resref = future()
36 resref = future()
38 self.calls.append((name, args, opts, resref,))
37 self.calls.append((name, args, opts, resref,))
39 return resref
38 return resref
40 return call
39 return call
41 def submit(self):
40 def submit(self):
42 raise NotImplementedError()
41 raise NotImplementedError()
43
42
44 class iterbatcher(batcher):
43 class iterbatcher(batcher):
45
44
46 def submit(self):
45 def submit(self):
47 raise NotImplementedError()
46 raise NotImplementedError()
48
47
49 def results(self):
48 def results(self):
50 raise NotImplementedError()
49 raise NotImplementedError()
51
50
52 class localiterbatcher(iterbatcher):
51 class localiterbatcher(iterbatcher):
53 def __init__(self, local):
52 def __init__(self, local):
54 super(iterbatcher, self).__init__()
53 super(iterbatcher, self).__init__()
55 self.local = local
54 self.local = local
56
55
57 def submit(self):
56 def submit(self):
58 # submit for a local iter batcher is a noop
57 # submit for a local iter batcher is a noop
59 pass
58 pass
60
59
61 def results(self):
60 def results(self):
62 for name, args, opts, resref in self.calls:
61 for name, args, opts, resref in self.calls:
63 resref.set(getattr(self.local, name)(*args, **opts))
62 resref.set(getattr(self.local, name)(*args, **opts))
64 yield resref.value
63 yield resref.value
65
64
66 def batchable(f):
65 def batchable(f):
67 '''annotation for batchable methods
66 '''annotation for batchable methods
68
67
69 Such methods must implement a coroutine as follows:
68 Such methods must implement a coroutine as follows:
70
69
71 @batchable
70 @batchable
72 def sample(self, one, two=None):
71 def sample(self, one, two=None):
73 # Build list of encoded arguments suitable for your wire protocol:
72 # Build list of encoded arguments suitable for your wire protocol:
74 encargs = [('one', encode(one),), ('two', encode(two),)]
73 encargs = [('one', encode(one),), ('two', encode(two),)]
75 # Create future for injection of encoded result:
74 # Create future for injection of encoded result:
76 encresref = future()
75 encresref = future()
77 # Return encoded arguments and future:
76 # Return encoded arguments and future:
78 yield encargs, encresref
77 yield encargs, encresref
79 # Assuming the future to be filled with the result from the batched
78 # Assuming the future to be filled with the result from the batched
80 # request now. Decode it:
79 # request now. Decode it:
81 yield decode(encresref.value)
80 yield decode(encresref.value)
82
81
83 The decorator returns a function which wraps this coroutine as a plain
82 The decorator returns a function which wraps this coroutine as a plain
84 method, but adds the original method as an attribute called "batchable",
83 method, but adds the original method as an attribute called "batchable",
85 which is used by remotebatch to split the call into separate encoding and
84 which is used by remotebatch to split the call into separate encoding and
86 decoding phases.
85 decoding phases.
87 '''
86 '''
88 def plain(*args, **opts):
87 def plain(*args, **opts):
89 batchable = f(*args, **opts)
88 batchable = f(*args, **opts)
90 encargsorres, encresref = next(batchable)
89 encargsorres, encresref = next(batchable)
91 if not encresref:
90 if not encresref:
92 return encargsorres # a local result in this case
91 return encargsorres # a local result in this case
93 self = args[0]
92 self = args[0]
94 encresref.set(self._submitone(f.func_name, encargsorres))
93 encresref.set(self._submitone(f.func_name, encargsorres))
95 return next(batchable)
94 return next(batchable)
96 setattr(plain, 'batchable', f)
95 setattr(plain, 'batchable', f)
97 return plain
96 return plain
98
99 class peerrepository(object):
100 def iterbatch(self):
101 """Batch requests but allow iterating over the results.
102
103 This is to allow interleaving responses with things like
104 progress updates for clients.
105 """
106 return localiterbatcher(self)
107
108 def capable(self, name):
109 '''tell whether repo supports named capability.
110 return False if not supported.
111 if boolean capability, return True.
112 if string capability, return string.'''
113 caps = self._capabilities()
114 if name in caps:
115 return True
116 name_eq = name + '='
117 for cap in caps:
118 if cap.startswith(name_eq):
119 return cap[len(name_eq):]
120 return False
121
122 def requirecap(self, name, purpose):
123 '''raise an exception if the given capability is not present'''
124 if not self.capable(name):
125 raise error.CapabilityError(
126 _('cannot %s; remote repository does not '
127 'support the %r capability') % (purpose, name))
128
129 def local(self):
130 '''return peer as a localrepo, or None'''
131 return None
132
133 def peer(self):
134 return self
135
136 def canpush(self):
137 return True
138
139 def close(self):
140 pass
@@ -1,353 +1,349 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import re
10 import re
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 repository,
17 util,
16 util,
18 wireproto,
17 wireproto,
19 )
18 )
20
19
21 def _serverquote(s):
20 def _serverquote(s):
22 if not s:
21 if not s:
23 return s
22 return s
24 '''quote a string for the remote shell ... which we assume is sh'''
23 '''quote a string for the remote shell ... which we assume is sh'''
25 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 return s
25 return s
27 return "'%s'" % s.replace("'", "'\\''")
26 return "'%s'" % s.replace("'", "'\\''")
28
27
29 def _forwardoutput(ui, pipe):
28 def _forwardoutput(ui, pipe):
30 """display all data currently available on pipe as remote output.
29 """display all data currently available on pipe as remote output.
31
30
32 This is non blocking."""
31 This is non blocking."""
33 s = util.readpipe(pipe)
32 s = util.readpipe(pipe)
34 if s:
33 if s:
35 for l in s.splitlines():
34 for l in s.splitlines():
36 ui.status(_("remote: "), l, '\n')
35 ui.status(_("remote: "), l, '\n')
37
36
38 class doublepipe(object):
37 class doublepipe(object):
39 """Operate a side-channel pipe in addition of a main one
38 """Operate a side-channel pipe in addition of a main one
40
39
41 The side-channel pipe contains server output to be forwarded to the user
40 The side-channel pipe contains server output to be forwarded to the user
42 input. The double pipe will behave as the "main" pipe, but will ensure the
41 input. The double pipe will behave as the "main" pipe, but will ensure the
43 content of the "side" pipe is properly processed while we wait for blocking
42 content of the "side" pipe is properly processed while we wait for blocking
44 call on the "main" pipe.
43 call on the "main" pipe.
45
44
46 If large amounts of data are read from "main", the forward will cease after
45 If large amounts of data are read from "main", the forward will cease after
47 the first bytes start to appear. This simplifies the implementation
46 the first bytes start to appear. This simplifies the implementation
48 without affecting actual output of sshpeer too much as we rarely issue
47 without affecting actual output of sshpeer too much as we rarely issue
49 large read for data not yet emitted by the server.
48 large read for data not yet emitted by the server.
50
49
51 The main pipe is expected to be a 'bufferedinputpipe' from the util module
50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
52 that handle all the os specific bits. This class lives in this module
51 that handle all the os specific bits. This class lives in this module
53 because it focus on behavior specific to the ssh protocol."""
52 because it focus on behavior specific to the ssh protocol."""
54
53
55 def __init__(self, ui, main, side):
54 def __init__(self, ui, main, side):
56 self._ui = ui
55 self._ui = ui
57 self._main = main
56 self._main = main
58 self._side = side
57 self._side = side
59
58
60 def _wait(self):
59 def _wait(self):
61 """wait until some data are available on main or side
60 """wait until some data are available on main or side
62
61
63 return a pair of boolean (ismainready, issideready)
62 return a pair of boolean (ismainready, issideready)
64
63
65 (This will only wait for data if the setup is supported by `util.poll`)
64 (This will only wait for data if the setup is supported by `util.poll`)
66 """
65 """
67 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
68 return (True, True) # main has data, assume side is worth poking at.
67 return (True, True) # main has data, assume side is worth poking at.
69 fds = [self._main.fileno(), self._side.fileno()]
68 fds = [self._main.fileno(), self._side.fileno()]
70 try:
69 try:
71 act = util.poll(fds)
70 act = util.poll(fds)
72 except NotImplementedError:
71 except NotImplementedError:
73 # non supported yet case, assume all have data.
72 # non supported yet case, assume all have data.
74 act = fds
73 act = fds
75 return (self._main.fileno() in act, self._side.fileno() in act)
74 return (self._main.fileno() in act, self._side.fileno() in act)
76
75
77 def write(self, data):
76 def write(self, data):
78 return self._call('write', data)
77 return self._call('write', data)
79
78
80 def read(self, size):
79 def read(self, size):
81 r = self._call('read', size)
80 r = self._call('read', size)
82 if size != 0 and not r:
81 if size != 0 and not r:
83 # We've observed a condition that indicates the
82 # We've observed a condition that indicates the
84 # stdout closed unexpectedly. Check stderr one
83 # stdout closed unexpectedly. Check stderr one
85 # more time and snag anything that's there before
84 # more time and snag anything that's there before
86 # letting anyone know the main part of the pipe
85 # letting anyone know the main part of the pipe
87 # closed prematurely.
86 # closed prematurely.
88 _forwardoutput(self._ui, self._side)
87 _forwardoutput(self._ui, self._side)
89 return r
88 return r
90
89
91 def readline(self):
90 def readline(self):
92 return self._call('readline')
91 return self._call('readline')
93
92
94 def _call(self, methname, data=None):
93 def _call(self, methname, data=None):
95 """call <methname> on "main", forward output of "side" while blocking
94 """call <methname> on "main", forward output of "side" while blocking
96 """
95 """
97 # data can be '' or 0
96 # data can be '' or 0
98 if (data is not None and not data) or self._main.closed:
97 if (data is not None and not data) or self._main.closed:
99 _forwardoutput(self._ui, self._side)
98 _forwardoutput(self._ui, self._side)
100 return ''
99 return ''
101 while True:
100 while True:
102 mainready, sideready = self._wait()
101 mainready, sideready = self._wait()
103 if sideready:
102 if sideready:
104 _forwardoutput(self._ui, self._side)
103 _forwardoutput(self._ui, self._side)
105 if mainready:
104 if mainready:
106 meth = getattr(self._main, methname)
105 meth = getattr(self._main, methname)
107 if data is None:
106 if data is None:
108 return meth()
107 return meth()
109 else:
108 else:
110 return meth(data)
109 return meth(data)
111
110
112 def close(self):
111 def close(self):
113 return self._main.close()
112 return self._main.close()
114
113
115 def flush(self):
114 def flush(self):
116 return self._main.flush()
115 return self._main.flush()
117
116
118 class sshpeer(wireproto.wirepeer, repository.legacypeer):
117 class sshpeer(wireproto.wirepeer):
119 def __init__(self, ui, path, create=False):
118 def __init__(self, ui, path, create=False):
120 self._url = path
119 self._url = path
121 self._ui = ui
120 self._ui = ui
122 self._pipeo = self._pipei = self._pipee = None
121 self._pipeo = self._pipei = self._pipee = None
123
122
124 u = util.url(path, parsequery=False, parsefragment=False)
123 u = util.url(path, parsequery=False, parsefragment=False)
125 if u.scheme != 'ssh' or not u.host or u.path is None:
124 if u.scheme != 'ssh' or not u.host or u.path is None:
126 self._abort(error.RepoError(_("couldn't parse location %s") % path))
125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
127
126
128 util.checksafessh(path)
127 util.checksafessh(path)
129
128
130 if u.passwd is not None:
129 if u.passwd is not None:
131 self._abort(error.RepoError(_("password in URL not supported")))
130 self._abort(error.RepoError(_("password in URL not supported")))
132
131
133 self._user = u.user
132 self._user = u.user
134 self._host = u.host
133 self._host = u.host
135 self._port = u.port
134 self._port = u.port
136 self._path = u.path or '.'
135 self._path = u.path or '.'
137
136
138 sshcmd = self.ui.config("ui", "ssh")
137 sshcmd = self.ui.config("ui", "ssh")
139 remotecmd = self.ui.config("ui", "remotecmd")
138 remotecmd = self.ui.config("ui", "remotecmd")
140
139
141 args = util.sshargs(sshcmd, self._host, self._user, self._port)
140 args = util.sshargs(sshcmd, self._host, self._user, self._port)
142
141
143 if create:
142 if create:
144 cmd = '%s %s %s' % (sshcmd, args,
143 cmd = '%s %s %s' % (sshcmd, args,
145 util.shellquote("%s init %s" %
144 util.shellquote("%s init %s" %
146 (_serverquote(remotecmd), _serverquote(self._path))))
145 (_serverquote(remotecmd), _serverquote(self._path))))
147 ui.debug('running %s\n' % cmd)
146 ui.debug('running %s\n' % cmd)
148 res = ui.system(cmd, blockedtag='sshpeer')
147 res = ui.system(cmd, blockedtag='sshpeer')
149 if res != 0:
148 if res != 0:
150 self._abort(error.RepoError(_("could not create remote repo")))
149 self._abort(error.RepoError(_("could not create remote repo")))
151
150
152 self._validaterepo(sshcmd, args, remotecmd)
151 self._validaterepo(sshcmd, args, remotecmd)
153
152
154 # TODO remove this alias once peerrepository inheritance is removed.
155 self._capabilities = self.capabilities
156
157 # Begin of _basepeer interface.
153 # Begin of _basepeer interface.
158
154
159 @util.propertycache
155 @util.propertycache
160 def ui(self):
156 def ui(self):
161 return self._ui
157 return self._ui
162
158
163 def url(self):
159 def url(self):
164 return self._url
160 return self._url
165
161
166 def local(self):
162 def local(self):
167 return None
163 return None
168
164
169 def peer(self):
165 def peer(self):
170 return self
166 return self
171
167
172 def canpush(self):
168 def canpush(self):
173 return True
169 return True
174
170
175 def close(self):
171 def close(self):
176 pass
172 pass
177
173
178 # End of _basepeer interface.
174 # End of _basepeer interface.
179
175
180 # Begin of _basewirecommands interface.
176 # Begin of _basewirecommands interface.
181
177
182 def capabilities(self):
178 def capabilities(self):
183 return self._caps
179 return self._caps
184
180
185 # End of _basewirecommands interface.
181 # End of _basewirecommands interface.
186
182
187 def _validaterepo(self, sshcmd, args, remotecmd):
183 def _validaterepo(self, sshcmd, args, remotecmd):
188 # cleanup up previous run
184 # cleanup up previous run
189 self._cleanup()
185 self._cleanup()
190
186
191 cmd = '%s %s %s' % (sshcmd, args,
187 cmd = '%s %s %s' % (sshcmd, args,
192 util.shellquote("%s -R %s serve --stdio" %
188 util.shellquote("%s -R %s serve --stdio" %
193 (_serverquote(remotecmd), _serverquote(self._path))))
189 (_serverquote(remotecmd), _serverquote(self._path))))
194 self.ui.debug('running %s\n' % cmd)
190 self.ui.debug('running %s\n' % cmd)
195 cmd = util.quotecommand(cmd)
191 cmd = util.quotecommand(cmd)
196
192
197 # while self._subprocess isn't used, having it allows the subprocess to
193 # while self._subprocess isn't used, having it allows the subprocess to
198 # to clean up correctly later
194 # to clean up correctly later
199 #
195 #
200 # no buffer allow the use of 'select'
196 # no buffer allow the use of 'select'
201 # feel free to remove buffering and select usage when we ultimately
197 # feel free to remove buffering and select usage when we ultimately
202 # move to threading.
198 # move to threading.
203 sub = util.popen4(cmd, bufsize=0)
199 sub = util.popen4(cmd, bufsize=0)
204 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
200 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
205
201
206 self._pipei = util.bufferedinputpipe(self._pipei)
202 self._pipei = util.bufferedinputpipe(self._pipei)
207 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
203 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
208 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
204 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
209
205
210 # skip any noise generated by remote shell
206 # skip any noise generated by remote shell
211 self._callstream("hello")
207 self._callstream("hello")
212 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
208 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
213 lines = ["", "dummy"]
209 lines = ["", "dummy"]
214 max_noise = 500
210 max_noise = 500
215 while lines[-1] and max_noise:
211 while lines[-1] and max_noise:
216 l = r.readline()
212 l = r.readline()
217 self._readerr()
213 self._readerr()
218 if lines[-1] == "1\n" and l == "\n":
214 if lines[-1] == "1\n" and l == "\n":
219 break
215 break
220 if l:
216 if l:
221 self.ui.debug("remote: ", l)
217 self.ui.debug("remote: ", l)
222 lines.append(l)
218 lines.append(l)
223 max_noise -= 1
219 max_noise -= 1
224 else:
220 else:
225 self._abort(error.RepoError(_('no suitable response from '
221 self._abort(error.RepoError(_('no suitable response from '
226 'remote hg')))
222 'remote hg')))
227
223
228 self._caps = set()
224 self._caps = set()
229 for l in reversed(lines):
225 for l in reversed(lines):
230 if l.startswith("capabilities:"):
226 if l.startswith("capabilities:"):
231 self._caps.update(l[:-1].split(":")[1].split())
227 self._caps.update(l[:-1].split(":")[1].split())
232 break
228 break
233
229
234 def _readerr(self):
230 def _readerr(self):
235 _forwardoutput(self.ui, self._pipee)
231 _forwardoutput(self.ui, self._pipee)
236
232
237 def _abort(self, exception):
233 def _abort(self, exception):
238 self._cleanup()
234 self._cleanup()
239 raise exception
235 raise exception
240
236
241 def _cleanup(self):
237 def _cleanup(self):
242 if self._pipeo is None:
238 if self._pipeo is None:
243 return
239 return
244 self._pipeo.close()
240 self._pipeo.close()
245 self._pipei.close()
241 self._pipei.close()
246 try:
242 try:
247 # read the error descriptor until EOF
243 # read the error descriptor until EOF
248 for l in self._pipee:
244 for l in self._pipee:
249 self.ui.status(_("remote: "), l)
245 self.ui.status(_("remote: "), l)
250 except (IOError, ValueError):
246 except (IOError, ValueError):
251 pass
247 pass
252 self._pipee.close()
248 self._pipee.close()
253
249
254 __del__ = _cleanup
250 __del__ = _cleanup
255
251
256 def _submitbatch(self, req):
252 def _submitbatch(self, req):
257 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
253 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
258 available = self._getamount()
254 available = self._getamount()
259 # TODO this response parsing is probably suboptimal for large
255 # TODO this response parsing is probably suboptimal for large
260 # batches with large responses.
256 # batches with large responses.
261 toread = min(available, 1024)
257 toread = min(available, 1024)
262 work = rsp.read(toread)
258 work = rsp.read(toread)
263 available -= toread
259 available -= toread
264 chunk = work
260 chunk = work
265 while chunk:
261 while chunk:
266 while ';' in work:
262 while ';' in work:
267 one, work = work.split(';', 1)
263 one, work = work.split(';', 1)
268 yield wireproto.unescapearg(one)
264 yield wireproto.unescapearg(one)
269 toread = min(available, 1024)
265 toread = min(available, 1024)
270 chunk = rsp.read(toread)
266 chunk = rsp.read(toread)
271 available -= toread
267 available -= toread
272 work += chunk
268 work += chunk
273 yield wireproto.unescapearg(work)
269 yield wireproto.unescapearg(work)
274
270
275 def _callstream(self, cmd, **args):
271 def _callstream(self, cmd, **args):
276 args = pycompat.byteskwargs(args)
272 args = pycompat.byteskwargs(args)
277 self.ui.debug("sending %s command\n" % cmd)
273 self.ui.debug("sending %s command\n" % cmd)
278 self._pipeo.write("%s\n" % cmd)
274 self._pipeo.write("%s\n" % cmd)
279 _func, names = wireproto.commands[cmd]
275 _func, names = wireproto.commands[cmd]
280 keys = names.split()
276 keys = names.split()
281 wireargs = {}
277 wireargs = {}
282 for k in keys:
278 for k in keys:
283 if k == '*':
279 if k == '*':
284 wireargs['*'] = args
280 wireargs['*'] = args
285 break
281 break
286 else:
282 else:
287 wireargs[k] = args[k]
283 wireargs[k] = args[k]
288 del args[k]
284 del args[k]
289 for k, v in sorted(wireargs.iteritems()):
285 for k, v in sorted(wireargs.iteritems()):
290 self._pipeo.write("%s %d\n" % (k, len(v)))
286 self._pipeo.write("%s %d\n" % (k, len(v)))
291 if isinstance(v, dict):
287 if isinstance(v, dict):
292 for dk, dv in v.iteritems():
288 for dk, dv in v.iteritems():
293 self._pipeo.write("%s %d\n" % (dk, len(dv)))
289 self._pipeo.write("%s %d\n" % (dk, len(dv)))
294 self._pipeo.write(dv)
290 self._pipeo.write(dv)
295 else:
291 else:
296 self._pipeo.write(v)
292 self._pipeo.write(v)
297 self._pipeo.flush()
293 self._pipeo.flush()
298
294
299 return self._pipei
295 return self._pipei
300
296
301 def _callcompressable(self, cmd, **args):
297 def _callcompressable(self, cmd, **args):
302 return self._callstream(cmd, **args)
298 return self._callstream(cmd, **args)
303
299
304 def _call(self, cmd, **args):
300 def _call(self, cmd, **args):
305 self._callstream(cmd, **args)
301 self._callstream(cmd, **args)
306 return self._recv()
302 return self._recv()
307
303
308 def _callpush(self, cmd, fp, **args):
304 def _callpush(self, cmd, fp, **args):
309 r = self._call(cmd, **args)
305 r = self._call(cmd, **args)
310 if r:
306 if r:
311 return '', r
307 return '', r
312 for d in iter(lambda: fp.read(4096), ''):
308 for d in iter(lambda: fp.read(4096), ''):
313 self._send(d)
309 self._send(d)
314 self._send("", flush=True)
310 self._send("", flush=True)
315 r = self._recv()
311 r = self._recv()
316 if r:
312 if r:
317 return '', r
313 return '', r
318 return self._recv(), ''
314 return self._recv(), ''
319
315
320 def _calltwowaystream(self, cmd, fp, **args):
316 def _calltwowaystream(self, cmd, fp, **args):
321 r = self._call(cmd, **args)
317 r = self._call(cmd, **args)
322 if r:
318 if r:
323 # XXX needs to be made better
319 # XXX needs to be made better
324 raise error.Abort(_('unexpected remote reply: %s') % r)
320 raise error.Abort(_('unexpected remote reply: %s') % r)
325 for d in iter(lambda: fp.read(4096), ''):
321 for d in iter(lambda: fp.read(4096), ''):
326 self._send(d)
322 self._send(d)
327 self._send("", flush=True)
323 self._send("", flush=True)
328 return self._pipei
324 return self._pipei
329
325
330 def _getamount(self):
326 def _getamount(self):
331 l = self._pipei.readline()
327 l = self._pipei.readline()
332 if l == '\n':
328 if l == '\n':
333 self._readerr()
329 self._readerr()
334 msg = _('check previous remote output')
330 msg = _('check previous remote output')
335 self._abort(error.OutOfBandError(hint=msg))
331 self._abort(error.OutOfBandError(hint=msg))
336 self._readerr()
332 self._readerr()
337 try:
333 try:
338 return int(l)
334 return int(l)
339 except ValueError:
335 except ValueError:
340 self._abort(error.ResponseError(_("unexpected response:"), l))
336 self._abort(error.ResponseError(_("unexpected response:"), l))
341
337
342 def _recv(self):
338 def _recv(self):
343 return self._pipei.read(self._getamount())
339 return self._pipei.read(self._getamount())
344
340
345 def _send(self, data, flush=False):
341 def _send(self, data, flush=False):
346 self._pipeo.write("%d\n" % len(data))
342 self._pipeo.write("%d\n" % len(data))
347 if data:
343 if data:
348 self._pipeo.write(data)
344 self._pipeo.write(data)
349 if flush:
345 if flush:
350 self._pipeo.flush()
346 self._pipeo.flush()
351 self._readerr()
347 self._readerr()
352
348
353 instance = sshpeer
349 instance = sshpeer
@@ -1,1050 +1,1059 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 encoding,
24 encoding,
25 error,
25 error,
26 exchange,
26 exchange,
27 peer,
27 peer,
28 pushkey as pushkeymod,
28 pushkey as pushkeymod,
29 pycompat,
29 pycompat,
30 repository,
30 streamclone,
31 streamclone,
31 util,
32 util,
32 )
33 )
33
34
34 urlerr = util.urlerr
35 urlerr = util.urlerr
35 urlreq = util.urlreq
36 urlreq = util.urlreq
36
37
37 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
38 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
39 'IncompatibleClient')
40 'IncompatibleClient')
40 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
41
42
42 class abstractserverproto(object):
43 class abstractserverproto(object):
43 """abstract class that summarizes the protocol API
44 """abstract class that summarizes the protocol API
44
45
45 Used as reference and documentation.
46 Used as reference and documentation.
46 """
47 """
47
48
48 def getargs(self, args):
49 def getargs(self, args):
49 """return the value for arguments in <args>
50 """return the value for arguments in <args>
50
51
51 returns a list of values (same order as <args>)"""
52 returns a list of values (same order as <args>)"""
52 raise NotImplementedError()
53 raise NotImplementedError()
53
54
54 def getfile(self, fp):
55 def getfile(self, fp):
55 """write the whole content of a file into a file like object
56 """write the whole content of a file into a file like object
56
57
57 The file is in the form::
58 The file is in the form::
58
59
59 (<chunk-size>\n<chunk>)+0\n
60 (<chunk-size>\n<chunk>)+0\n
60
61
61 chunk size is the ascii version of the int.
62 chunk size is the ascii version of the int.
62 """
63 """
63 raise NotImplementedError()
64 raise NotImplementedError()
64
65
65 def redirect(self):
66 def redirect(self):
66 """may setup interception for stdout and stderr
67 """may setup interception for stdout and stderr
67
68
68 See also the `restore` method."""
69 See also the `restore` method."""
69 raise NotImplementedError()
70 raise NotImplementedError()
70
71
71 # If the `redirect` function does install interception, the `restore`
72 # If the `redirect` function does install interception, the `restore`
72 # function MUST be defined. If interception is not used, this function
73 # function MUST be defined. If interception is not used, this function
73 # MUST NOT be defined.
74 # MUST NOT be defined.
74 #
75 #
75 # left commented here on purpose
76 # left commented here on purpose
76 #
77 #
77 #def restore(self):
78 #def restore(self):
78 # """reinstall previous stdout and stderr and return intercepted stdout
79 # """reinstall previous stdout and stderr and return intercepted stdout
79 # """
80 # """
80 # raise NotImplementedError()
81 # raise NotImplementedError()
81
82
82 class remoteiterbatcher(peer.iterbatcher):
83 class remoteiterbatcher(peer.iterbatcher):
83 def __init__(self, remote):
84 def __init__(self, remote):
84 super(remoteiterbatcher, self).__init__()
85 super(remoteiterbatcher, self).__init__()
85 self._remote = remote
86 self._remote = remote
86
87
87 def __getattr__(self, name):
88 def __getattr__(self, name):
88 # Validate this method is batchable, since submit() only supports
89 # Validate this method is batchable, since submit() only supports
89 # batchable methods.
90 # batchable methods.
90 fn = getattr(self._remote, name)
91 fn = getattr(self._remote, name)
91 if not getattr(fn, 'batchable', None):
92 if not getattr(fn, 'batchable', None):
92 raise error.ProgrammingError('Attempted to batch a non-batchable '
93 raise error.ProgrammingError('Attempted to batch a non-batchable '
93 'call to %r' % name)
94 'call to %r' % name)
94
95
95 return super(remoteiterbatcher, self).__getattr__(name)
96 return super(remoteiterbatcher, self).__getattr__(name)
96
97
97 def submit(self):
98 def submit(self):
98 """Break the batch request into many patch calls and pipeline them.
99 """Break the batch request into many patch calls and pipeline them.
99
100
100 This is mostly valuable over http where request sizes can be
101 This is mostly valuable over http where request sizes can be
101 limited, but can be used in other places as well.
102 limited, but can be used in other places as well.
102 """
103 """
103 # 2-tuple of (command, arguments) that represents what will be
104 # 2-tuple of (command, arguments) that represents what will be
104 # sent over the wire.
105 # sent over the wire.
105 requests = []
106 requests = []
106
107
107 # 4-tuple of (command, final future, @batchable generator, remote
108 # 4-tuple of (command, final future, @batchable generator, remote
108 # future).
109 # future).
109 results = []
110 results = []
110
111
111 for command, args, opts, finalfuture in self.calls:
112 for command, args, opts, finalfuture in self.calls:
112 mtd = getattr(self._remote, command)
113 mtd = getattr(self._remote, command)
113 batchable = mtd.batchable(mtd.im_self, *args, **opts)
114 batchable = mtd.batchable(mtd.im_self, *args, **opts)
114
115
115 commandargs, fremote = next(batchable)
116 commandargs, fremote = next(batchable)
116 assert fremote
117 assert fremote
117 requests.append((command, commandargs))
118 requests.append((command, commandargs))
118 results.append((command, finalfuture, batchable, fremote))
119 results.append((command, finalfuture, batchable, fremote))
119
120
120 if requests:
121 if requests:
121 self._resultiter = self._remote._submitbatch(requests)
122 self._resultiter = self._remote._submitbatch(requests)
122
123
123 self._results = results
124 self._results = results
124
125
125 def results(self):
126 def results(self):
126 for command, finalfuture, batchable, remotefuture in self._results:
127 for command, finalfuture, batchable, remotefuture in self._results:
127 # Get the raw result, set it in the remote future, feed it
128 # Get the raw result, set it in the remote future, feed it
128 # back into the @batchable generator so it can be decoded, and
129 # back into the @batchable generator so it can be decoded, and
129 # set the result on the final future to this value.
130 # set the result on the final future to this value.
130 remoteresult = next(self._resultiter)
131 remoteresult = next(self._resultiter)
131 remotefuture.set(remoteresult)
132 remotefuture.set(remoteresult)
132 finalfuture.set(next(batchable))
133 finalfuture.set(next(batchable))
133
134
134 # Verify our @batchable generators only emit 2 values.
135 # Verify our @batchable generators only emit 2 values.
135 try:
136 try:
136 next(batchable)
137 next(batchable)
137 except StopIteration:
138 except StopIteration:
138 pass
139 pass
139 else:
140 else:
140 raise error.ProgrammingError('%s @batchable generator emitted '
141 raise error.ProgrammingError('%s @batchable generator emitted '
141 'unexpected value count' % command)
142 'unexpected value count' % command)
142
143
143 yield finalfuture.value
144 yield finalfuture.value
144
145
145 # Forward a couple of names from peer to make wireproto interactions
146 # Forward a couple of names from peer to make wireproto interactions
146 # slightly more sensible.
147 # slightly more sensible.
147 batchable = peer.batchable
148 batchable = peer.batchable
148 future = peer.future
149 future = peer.future
149
150
150 # list of nodes encoding / decoding
151 # list of nodes encoding / decoding
151
152
152 def decodelist(l, sep=' '):
153 def decodelist(l, sep=' '):
153 if l:
154 if l:
154 return map(bin, l.split(sep))
155 return map(bin, l.split(sep))
155 return []
156 return []
156
157
157 def encodelist(l, sep=' '):
158 def encodelist(l, sep=' '):
158 try:
159 try:
159 return sep.join(map(hex, l))
160 return sep.join(map(hex, l))
160 except TypeError:
161 except TypeError:
161 raise
162 raise
162
163
163 # batched call argument encoding
164 # batched call argument encoding
164
165
165 def escapearg(plain):
166 def escapearg(plain):
166 return (plain
167 return (plain
167 .replace(':', ':c')
168 .replace(':', ':c')
168 .replace(',', ':o')
169 .replace(',', ':o')
169 .replace(';', ':s')
170 .replace(';', ':s')
170 .replace('=', ':e'))
171 .replace('=', ':e'))
171
172
172 def unescapearg(escaped):
173 def unescapearg(escaped):
173 return (escaped
174 return (escaped
174 .replace(':e', '=')
175 .replace(':e', '=')
175 .replace(':s', ';')
176 .replace(':s', ';')
176 .replace(':o', ',')
177 .replace(':o', ',')
177 .replace(':c', ':'))
178 .replace(':c', ':'))
178
179
179 def encodebatchcmds(req):
180 def encodebatchcmds(req):
180 """Return a ``cmds`` argument value for the ``batch`` command."""
181 """Return a ``cmds`` argument value for the ``batch`` command."""
181 cmds = []
182 cmds = []
182 for op, argsdict in req:
183 for op, argsdict in req:
183 # Old servers didn't properly unescape argument names. So prevent
184 # Old servers didn't properly unescape argument names. So prevent
184 # the sending of argument names that may not be decoded properly by
185 # the sending of argument names that may not be decoded properly by
185 # servers.
186 # servers.
186 assert all(escapearg(k) == k for k in argsdict)
187 assert all(escapearg(k) == k for k in argsdict)
187
188
188 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
189 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
189 for k, v in argsdict.iteritems())
190 for k, v in argsdict.iteritems())
190 cmds.append('%s %s' % (op, args))
191 cmds.append('%s %s' % (op, args))
191
192
192 return ';'.join(cmds)
193 return ';'.join(cmds)
193
194
194 # mapping of options accepted by getbundle and their types
195 # mapping of options accepted by getbundle and their types
195 #
196 #
196 # Meant to be extended by extensions. It is extensions responsibility to ensure
197 # Meant to be extended by extensions. It is extensions responsibility to ensure
197 # such options are properly processed in exchange.getbundle.
198 # such options are properly processed in exchange.getbundle.
198 #
199 #
199 # supported types are:
200 # supported types are:
200 #
201 #
201 # :nodes: list of binary nodes
202 # :nodes: list of binary nodes
202 # :csv: list of comma-separated values
203 # :csv: list of comma-separated values
203 # :scsv: list of comma-separated values return as set
204 # :scsv: list of comma-separated values return as set
204 # :plain: string with no transformation needed.
205 # :plain: string with no transformation needed.
205 gboptsmap = {'heads': 'nodes',
206 gboptsmap = {'heads': 'nodes',
206 'common': 'nodes',
207 'common': 'nodes',
207 'obsmarkers': 'boolean',
208 'obsmarkers': 'boolean',
208 'bundlecaps': 'scsv',
209 'bundlecaps': 'scsv',
209 'listkeys': 'csv',
210 'listkeys': 'csv',
210 'cg': 'boolean',
211 'cg': 'boolean',
211 'cbattempted': 'boolean'}
212 'cbattempted': 'boolean'}
212
213
213 # client side
214 # client side
214
215
215 class wirepeer(peer.peerrepository):
216 class wirepeer(repository.legacypeer):
216 """Client-side interface for communicating with a peer repository.
217 """Client-side interface for communicating with a peer repository.
217
218
218 Methods commonly call wire protocol commands of the same name.
219 Methods commonly call wire protocol commands of the same name.
219
220
220 See also httppeer.py and sshpeer.py for protocol-specific
221 See also httppeer.py and sshpeer.py for protocol-specific
221 implementations of this interface.
222 implementations of this interface.
222 """
223 """
223 def _submitbatch(self, req):
224 # Begin of basewirepeer interface.
224 """run batch request <req> on the server
225
226 Returns an iterator of the raw responses from the server.
227 """
228 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
229 chunk = rsp.read(1024)
230 work = [chunk]
231 while chunk:
232 while ';' not in chunk and chunk:
233 chunk = rsp.read(1024)
234 work.append(chunk)
235 merged = ''.join(work)
236 while ';' in merged:
237 one, merged = merged.split(';', 1)
238 yield unescapearg(one)
239 chunk = rsp.read(1024)
240 work = [merged, chunk]
241 yield unescapearg(''.join(work))
242
243 def _submitone(self, op, args):
244 return self._call(op, **args)
245
225
246 def iterbatch(self):
226 def iterbatch(self):
247 return remoteiterbatcher(self)
227 return remoteiterbatcher(self)
248
228
249 @batchable
229 @batchable
250 def lookup(self, key):
230 def lookup(self, key):
251 self.requirecap('lookup', _('look up remote revision'))
231 self.requirecap('lookup', _('look up remote revision'))
252 f = future()
232 f = future()
253 yield {'key': encoding.fromlocal(key)}, f
233 yield {'key': encoding.fromlocal(key)}, f
254 d = f.value
234 d = f.value
255 success, data = d[:-1].split(" ", 1)
235 success, data = d[:-1].split(" ", 1)
256 if int(success):
236 if int(success):
257 yield bin(data)
237 yield bin(data)
258 self._abort(error.RepoError(data))
238 self._abort(error.RepoError(data))
259
239
260 @batchable
240 @batchable
261 def heads(self):
241 def heads(self):
262 f = future()
242 f = future()
263 yield {}, f
243 yield {}, f
264 d = f.value
244 d = f.value
265 try:
245 try:
266 yield decodelist(d[:-1])
246 yield decodelist(d[:-1])
267 except ValueError:
247 except ValueError:
268 self._abort(error.ResponseError(_("unexpected response:"), d))
248 self._abort(error.ResponseError(_("unexpected response:"), d))
269
249
270 @batchable
250 @batchable
271 def known(self, nodes):
251 def known(self, nodes):
272 f = future()
252 f = future()
273 yield {'nodes': encodelist(nodes)}, f
253 yield {'nodes': encodelist(nodes)}, f
274 d = f.value
254 d = f.value
275 try:
255 try:
276 yield [bool(int(b)) for b in d]
256 yield [bool(int(b)) for b in d]
277 except ValueError:
257 except ValueError:
278 self._abort(error.ResponseError(_("unexpected response:"), d))
258 self._abort(error.ResponseError(_("unexpected response:"), d))
279
259
280 @batchable
260 @batchable
281 def branchmap(self):
261 def branchmap(self):
282 f = future()
262 f = future()
283 yield {}, f
263 yield {}, f
284 d = f.value
264 d = f.value
285 try:
265 try:
286 branchmap = {}
266 branchmap = {}
287 for branchpart in d.splitlines():
267 for branchpart in d.splitlines():
288 branchname, branchheads = branchpart.split(' ', 1)
268 branchname, branchheads = branchpart.split(' ', 1)
289 branchname = encoding.tolocal(urlreq.unquote(branchname))
269 branchname = encoding.tolocal(urlreq.unquote(branchname))
290 branchheads = decodelist(branchheads)
270 branchheads = decodelist(branchheads)
291 branchmap[branchname] = branchheads
271 branchmap[branchname] = branchheads
292 yield branchmap
272 yield branchmap
293 except TypeError:
273 except TypeError:
294 self._abort(error.ResponseError(_("unexpected response:"), d))
274 self._abort(error.ResponseError(_("unexpected response:"), d))
295
275
296 def branches(self, nodes):
276 @batchable
297 n = encodelist(nodes)
277 def listkeys(self, namespace):
298 d = self._call("branches", nodes=n)
278 if not self.capable('pushkey'):
299 try:
279 yield {}, None
300 br = [tuple(decodelist(b)) for b in d.splitlines()]
280 f = future()
301 return br
281 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
302 except ValueError:
282 yield {'namespace': encoding.fromlocal(namespace)}, f
303 self._abort(error.ResponseError(_("unexpected response:"), d))
283 d = f.value
304
284 self.ui.debug('received listkey for "%s": %i bytes\n'
305 def between(self, pairs):
285 % (namespace, len(d)))
306 batch = 8 # avoid giant requests
286 yield pushkeymod.decodekeys(d)
307 r = []
308 for i in xrange(0, len(pairs), batch):
309 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
310 d = self._call("between", pairs=n)
311 try:
312 r.extend(l and decodelist(l) or [] for l in d.splitlines())
313 except ValueError:
314 self._abort(error.ResponseError(_("unexpected response:"), d))
315 return r
316
287
317 @batchable
288 @batchable
318 def pushkey(self, namespace, key, old, new):
289 def pushkey(self, namespace, key, old, new):
319 if not self.capable('pushkey'):
290 if not self.capable('pushkey'):
320 yield False, None
291 yield False, None
321 f = future()
292 f = future()
322 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
293 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
323 yield {'namespace': encoding.fromlocal(namespace),
294 yield {'namespace': encoding.fromlocal(namespace),
324 'key': encoding.fromlocal(key),
295 'key': encoding.fromlocal(key),
325 'old': encoding.fromlocal(old),
296 'old': encoding.fromlocal(old),
326 'new': encoding.fromlocal(new)}, f
297 'new': encoding.fromlocal(new)}, f
327 d = f.value
298 d = f.value
328 d, output = d.split('\n', 1)
299 d, output = d.split('\n', 1)
329 try:
300 try:
330 d = bool(int(d))
301 d = bool(int(d))
331 except ValueError:
302 except ValueError:
332 raise error.ResponseError(
303 raise error.ResponseError(
333 _('push failed (unexpected response):'), d)
304 _('push failed (unexpected response):'), d)
334 for l in output.splitlines(True):
305 for l in output.splitlines(True):
335 self.ui.status(_('remote: '), l)
306 self.ui.status(_('remote: '), l)
336 yield d
307 yield d
337
308
338 @batchable
339 def listkeys(self, namespace):
340 if not self.capable('pushkey'):
341 yield {}, None
342 f = future()
343 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
344 yield {'namespace': encoding.fromlocal(namespace)}, f
345 d = f.value
346 self.ui.debug('received listkey for "%s": %i bytes\n'
347 % (namespace, len(d)))
348 yield pushkeymod.decodekeys(d)
349
350 def stream_out(self):
309 def stream_out(self):
351 return self._callstream('stream_out')
310 return self._callstream('stream_out')
352
311
353 def changegroup(self, nodes, kind):
354 n = encodelist(nodes)
355 f = self._callcompressable("changegroup", roots=n)
356 return changegroupmod.cg1unpacker(f, 'UN')
357
358 def changegroupsubset(self, bases, heads, kind):
359 self.requirecap('changegroupsubset', _('look up remote changes'))
360 bases = encodelist(bases)
361 heads = encodelist(heads)
362 f = self._callcompressable("changegroupsubset",
363 bases=bases, heads=heads)
364 return changegroupmod.cg1unpacker(f, 'UN')
365
366 def getbundle(self, source, **kwargs):
312 def getbundle(self, source, **kwargs):
367 self.requirecap('getbundle', _('look up remote changes'))
313 self.requirecap('getbundle', _('look up remote changes'))
368 opts = {}
314 opts = {}
369 bundlecaps = kwargs.get('bundlecaps')
315 bundlecaps = kwargs.get('bundlecaps')
370 if bundlecaps is not None:
316 if bundlecaps is not None:
371 kwargs['bundlecaps'] = sorted(bundlecaps)
317 kwargs['bundlecaps'] = sorted(bundlecaps)
372 else:
318 else:
373 bundlecaps = () # kwargs could have it to None
319 bundlecaps = () # kwargs could have it to None
374 for key, value in kwargs.iteritems():
320 for key, value in kwargs.iteritems():
375 if value is None:
321 if value is None:
376 continue
322 continue
377 keytype = gboptsmap.get(key)
323 keytype = gboptsmap.get(key)
378 if keytype is None:
324 if keytype is None:
379 assert False, 'unexpected'
325 assert False, 'unexpected'
380 elif keytype == 'nodes':
326 elif keytype == 'nodes':
381 value = encodelist(value)
327 value = encodelist(value)
382 elif keytype in ('csv', 'scsv'):
328 elif keytype in ('csv', 'scsv'):
383 value = ','.join(value)
329 value = ','.join(value)
384 elif keytype == 'boolean':
330 elif keytype == 'boolean':
385 value = '%i' % bool(value)
331 value = '%i' % bool(value)
386 elif keytype != 'plain':
332 elif keytype != 'plain':
387 raise KeyError('unknown getbundle option type %s'
333 raise KeyError('unknown getbundle option type %s'
388 % keytype)
334 % keytype)
389 opts[key] = value
335 opts[key] = value
390 f = self._callcompressable("getbundle", **opts)
336 f = self._callcompressable("getbundle", **opts)
391 if any((cap.startswith('HG2') for cap in bundlecaps)):
337 if any((cap.startswith('HG2') for cap in bundlecaps)):
392 return bundle2.getunbundler(self.ui, f)
338 return bundle2.getunbundler(self.ui, f)
393 else:
339 else:
394 return changegroupmod.cg1unpacker(f, 'UN')
340 return changegroupmod.cg1unpacker(f, 'UN')
395
341
396 def unbundle(self, cg, heads, url):
342 def unbundle(self, cg, heads, url):
397 '''Send cg (a readable file-like object representing the
343 '''Send cg (a readable file-like object representing the
398 changegroup to push, typically a chunkbuffer object) to the
344 changegroup to push, typically a chunkbuffer object) to the
399 remote server as a bundle.
345 remote server as a bundle.
400
346
401 When pushing a bundle10 stream, return an integer indicating the
347 When pushing a bundle10 stream, return an integer indicating the
402 result of the push (see changegroup.apply()).
348 result of the push (see changegroup.apply()).
403
349
404 When pushing a bundle20 stream, return a bundle20 stream.
350 When pushing a bundle20 stream, return a bundle20 stream.
405
351
406 `url` is the url the client thinks it's pushing to, which is
352 `url` is the url the client thinks it's pushing to, which is
407 visible to hooks.
353 visible to hooks.
408 '''
354 '''
409
355
410 if heads != ['force'] and self.capable('unbundlehash'):
356 if heads != ['force'] and self.capable('unbundlehash'):
411 heads = encodelist(['hashed',
357 heads = encodelist(['hashed',
412 hashlib.sha1(''.join(sorted(heads))).digest()])
358 hashlib.sha1(''.join(sorted(heads))).digest()])
413 else:
359 else:
414 heads = encodelist(heads)
360 heads = encodelist(heads)
415
361
416 if util.safehasattr(cg, 'deltaheader'):
362 if util.safehasattr(cg, 'deltaheader'):
417 # this a bundle10, do the old style call sequence
363 # this a bundle10, do the old style call sequence
418 ret, output = self._callpush("unbundle", cg, heads=heads)
364 ret, output = self._callpush("unbundle", cg, heads=heads)
419 if ret == "":
365 if ret == "":
420 raise error.ResponseError(
366 raise error.ResponseError(
421 _('push failed:'), output)
367 _('push failed:'), output)
422 try:
368 try:
423 ret = int(ret)
369 ret = int(ret)
424 except ValueError:
370 except ValueError:
425 raise error.ResponseError(
371 raise error.ResponseError(
426 _('push failed (unexpected response):'), ret)
372 _('push failed (unexpected response):'), ret)
427
373
428 for l in output.splitlines(True):
374 for l in output.splitlines(True):
429 self.ui.status(_('remote: '), l)
375 self.ui.status(_('remote: '), l)
430 else:
376 else:
431 # bundle2 push. Send a stream, fetch a stream.
377 # bundle2 push. Send a stream, fetch a stream.
432 stream = self._calltwowaystream('unbundle', cg, heads=heads)
378 stream = self._calltwowaystream('unbundle', cg, heads=heads)
433 ret = bundle2.getunbundler(self.ui, stream)
379 ret = bundle2.getunbundler(self.ui, stream)
434 return ret
380 return ret
435
381
382 # End of basewirepeer interface.
383
384 # Begin of baselegacywirepeer interface.
385
386 def branches(self, nodes):
387 n = encodelist(nodes)
388 d = self._call("branches", nodes=n)
389 try:
390 br = [tuple(decodelist(b)) for b in d.splitlines()]
391 return br
392 except ValueError:
393 self._abort(error.ResponseError(_("unexpected response:"), d))
394
395 def between(self, pairs):
396 batch = 8 # avoid giant requests
397 r = []
398 for i in xrange(0, len(pairs), batch):
399 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
400 d = self._call("between", pairs=n)
401 try:
402 r.extend(l and decodelist(l) or [] for l in d.splitlines())
403 except ValueError:
404 self._abort(error.ResponseError(_("unexpected response:"), d))
405 return r
406
407 def changegroup(self, nodes, kind):
408 n = encodelist(nodes)
409 f = self._callcompressable("changegroup", roots=n)
410 return changegroupmod.cg1unpacker(f, 'UN')
411
412 def changegroupsubset(self, bases, heads, kind):
413 self.requirecap('changegroupsubset', _('look up remote changes'))
414 bases = encodelist(bases)
415 heads = encodelist(heads)
416 f = self._callcompressable("changegroupsubset",
417 bases=bases, heads=heads)
418 return changegroupmod.cg1unpacker(f, 'UN')
419
420 # End of baselegacywirepeer interface.
421
422 def _submitbatch(self, req):
423 """run batch request <req> on the server
424
425 Returns an iterator of the raw responses from the server.
426 """
427 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
428 chunk = rsp.read(1024)
429 work = [chunk]
430 while chunk:
431 while ';' not in chunk and chunk:
432 chunk = rsp.read(1024)
433 work.append(chunk)
434 merged = ''.join(work)
435 while ';' in merged:
436 one, merged = merged.split(';', 1)
437 yield unescapearg(one)
438 chunk = rsp.read(1024)
439 work = [merged, chunk]
440 yield unescapearg(''.join(work))
441
442 def _submitone(self, op, args):
443 return self._call(op, **args)
444
436 def debugwireargs(self, one, two, three=None, four=None, five=None):
445 def debugwireargs(self, one, two, three=None, four=None, five=None):
437 # don't pass optional arguments left at their default value
446 # don't pass optional arguments left at their default value
438 opts = {}
447 opts = {}
439 if three is not None:
448 if three is not None:
440 opts['three'] = three
449 opts['three'] = three
441 if four is not None:
450 if four is not None:
442 opts['four'] = four
451 opts['four'] = four
443 return self._call('debugwireargs', one=one, two=two, **opts)
452 return self._call('debugwireargs', one=one, two=two, **opts)
444
453
445 def _call(self, cmd, **args):
454 def _call(self, cmd, **args):
446 """execute <cmd> on the server
455 """execute <cmd> on the server
447
456
448 The command is expected to return a simple string.
457 The command is expected to return a simple string.
449
458
450 returns the server reply as a string."""
459 returns the server reply as a string."""
451 raise NotImplementedError()
460 raise NotImplementedError()
452
461
453 def _callstream(self, cmd, **args):
462 def _callstream(self, cmd, **args):
454 """execute <cmd> on the server
463 """execute <cmd> on the server
455
464
456 The command is expected to return a stream. Note that if the
465 The command is expected to return a stream. Note that if the
457 command doesn't return a stream, _callstream behaves
466 command doesn't return a stream, _callstream behaves
458 differently for ssh and http peers.
467 differently for ssh and http peers.
459
468
460 returns the server reply as a file like object.
469 returns the server reply as a file like object.
461 """
470 """
462 raise NotImplementedError()
471 raise NotImplementedError()
463
472
464 def _callcompressable(self, cmd, **args):
473 def _callcompressable(self, cmd, **args):
465 """execute <cmd> on the server
474 """execute <cmd> on the server
466
475
467 The command is expected to return a stream.
476 The command is expected to return a stream.
468
477
469 The stream may have been compressed in some implementations. This
478 The stream may have been compressed in some implementations. This
470 function takes care of the decompression. This is the only difference
479 function takes care of the decompression. This is the only difference
471 with _callstream.
480 with _callstream.
472
481
473 returns the server reply as a file like object.
482 returns the server reply as a file like object.
474 """
483 """
475 raise NotImplementedError()
484 raise NotImplementedError()
476
485
477 def _callpush(self, cmd, fp, **args):
486 def _callpush(self, cmd, fp, **args):
478 """execute a <cmd> on server
487 """execute a <cmd> on server
479
488
480 The command is expected to be related to a push. Push has a special
489 The command is expected to be related to a push. Push has a special
481 return method.
490 return method.
482
491
483 returns the server reply as a (ret, output) tuple. ret is either
492 returns the server reply as a (ret, output) tuple. ret is either
484 empty (error) or a stringified int.
493 empty (error) or a stringified int.
485 """
494 """
486 raise NotImplementedError()
495 raise NotImplementedError()
487
496
488 def _calltwowaystream(self, cmd, fp, **args):
497 def _calltwowaystream(self, cmd, fp, **args):
489 """execute <cmd> on server
498 """execute <cmd> on server
490
499
491 The command will send a stream to the server and get a stream in reply.
500 The command will send a stream to the server and get a stream in reply.
492 """
501 """
493 raise NotImplementedError()
502 raise NotImplementedError()
494
503
495 def _abort(self, exception):
504 def _abort(self, exception):
496 """clearly abort the wire protocol connection and raise the exception
505 """clearly abort the wire protocol connection and raise the exception
497 """
506 """
498 raise NotImplementedError()
507 raise NotImplementedError()
499
508
500 # server side
509 # server side
501
510
502 # wire protocol command can either return a string or one of these classes.
511 # wire protocol command can either return a string or one of these classes.
503 class streamres(object):
512 class streamres(object):
504 """wireproto reply: binary stream
513 """wireproto reply: binary stream
505
514
506 The call was successful and the result is a stream.
515 The call was successful and the result is a stream.
507
516
508 Accepts either a generator or an object with a ``read(size)`` method.
517 Accepts either a generator or an object with a ``read(size)`` method.
509
518
510 ``v1compressible`` indicates whether this data can be compressed to
519 ``v1compressible`` indicates whether this data can be compressed to
511 "version 1" clients (technically: HTTP peers using
520 "version 1" clients (technically: HTTP peers using
512 application/mercurial-0.1 media type). This flag should NOT be used on
521 application/mercurial-0.1 media type). This flag should NOT be used on
513 new commands because new clients should support a more modern compression
522 new commands because new clients should support a more modern compression
514 mechanism.
523 mechanism.
515 """
524 """
516 def __init__(self, gen=None, reader=None, v1compressible=False):
525 def __init__(self, gen=None, reader=None, v1compressible=False):
517 self.gen = gen
526 self.gen = gen
518 self.reader = reader
527 self.reader = reader
519 self.v1compressible = v1compressible
528 self.v1compressible = v1compressible
520
529
521 class pushres(object):
530 class pushres(object):
522 """wireproto reply: success with simple integer return
531 """wireproto reply: success with simple integer return
523
532
524 The call was successful and returned an integer contained in `self.res`.
533 The call was successful and returned an integer contained in `self.res`.
525 """
534 """
526 def __init__(self, res):
535 def __init__(self, res):
527 self.res = res
536 self.res = res
528
537
529 class pusherr(object):
538 class pusherr(object):
530 """wireproto reply: failure
539 """wireproto reply: failure
531
540
532 The call failed. The `self.res` attribute contains the error message.
541 The call failed. The `self.res` attribute contains the error message.
533 """
542 """
534 def __init__(self, res):
543 def __init__(self, res):
535 self.res = res
544 self.res = res
536
545
537 class ooberror(object):
546 class ooberror(object):
538 """wireproto reply: failure of a batch of operation
547 """wireproto reply: failure of a batch of operation
539
548
540 Something failed during a batch call. The error message is stored in
549 Something failed during a batch call. The error message is stored in
541 `self.message`.
550 `self.message`.
542 """
551 """
543 def __init__(self, message):
552 def __init__(self, message):
544 self.message = message
553 self.message = message
545
554
546 def getdispatchrepo(repo, proto, command):
555 def getdispatchrepo(repo, proto, command):
547 """Obtain the repo used for processing wire protocol commands.
556 """Obtain the repo used for processing wire protocol commands.
548
557
549 The intent of this function is to serve as a monkeypatch point for
558 The intent of this function is to serve as a monkeypatch point for
550 extensions that need commands to operate on different repo views under
559 extensions that need commands to operate on different repo views under
551 specialized circumstances.
560 specialized circumstances.
552 """
561 """
553 return repo.filtered('served')
562 return repo.filtered('served')
554
563
555 def dispatch(repo, proto, command):
564 def dispatch(repo, proto, command):
556 repo = getdispatchrepo(repo, proto, command)
565 repo = getdispatchrepo(repo, proto, command)
557 func, spec = commands[command]
566 func, spec = commands[command]
558 args = proto.getargs(spec)
567 args = proto.getargs(spec)
559 return func(repo, proto, *args)
568 return func(repo, proto, *args)
560
569
561 def options(cmd, keys, others):
570 def options(cmd, keys, others):
562 opts = {}
571 opts = {}
563 for k in keys:
572 for k in keys:
564 if k in others:
573 if k in others:
565 opts[k] = others[k]
574 opts[k] = others[k]
566 del others[k]
575 del others[k]
567 if others:
576 if others:
568 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
577 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
569 % (cmd, ",".join(others)))
578 % (cmd, ",".join(others)))
570 return opts
579 return opts
571
580
572 def bundle1allowed(repo, action):
581 def bundle1allowed(repo, action):
573 """Whether a bundle1 operation is allowed from the server.
582 """Whether a bundle1 operation is allowed from the server.
574
583
575 Priority is:
584 Priority is:
576
585
577 1. server.bundle1gd.<action> (if generaldelta active)
586 1. server.bundle1gd.<action> (if generaldelta active)
578 2. server.bundle1.<action>
587 2. server.bundle1.<action>
579 3. server.bundle1gd (if generaldelta active)
588 3. server.bundle1gd (if generaldelta active)
580 4. server.bundle1
589 4. server.bundle1
581 """
590 """
582 ui = repo.ui
591 ui = repo.ui
583 gd = 'generaldelta' in repo.requirements
592 gd = 'generaldelta' in repo.requirements
584
593
585 if gd:
594 if gd:
586 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
595 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
587 if v is not None:
596 if v is not None:
588 return v
597 return v
589
598
590 v = ui.configbool('server', 'bundle1.%s' % action, None)
599 v = ui.configbool('server', 'bundle1.%s' % action, None)
591 if v is not None:
600 if v is not None:
592 return v
601 return v
593
602
594 if gd:
603 if gd:
595 v = ui.configbool('server', 'bundle1gd')
604 v = ui.configbool('server', 'bundle1gd')
596 if v is not None:
605 if v is not None:
597 return v
606 return v
598
607
599 return ui.configbool('server', 'bundle1')
608 return ui.configbool('server', 'bundle1')
600
609
601 def supportedcompengines(ui, proto, role):
610 def supportedcompengines(ui, proto, role):
602 """Obtain the list of supported compression engines for a request."""
611 """Obtain the list of supported compression engines for a request."""
603 assert role in (util.CLIENTROLE, util.SERVERROLE)
612 assert role in (util.CLIENTROLE, util.SERVERROLE)
604
613
605 compengines = util.compengines.supportedwireengines(role)
614 compengines = util.compengines.supportedwireengines(role)
606
615
607 # Allow config to override default list and ordering.
616 # Allow config to override default list and ordering.
608 if role == util.SERVERROLE:
617 if role == util.SERVERROLE:
609 configengines = ui.configlist('server', 'compressionengines')
618 configengines = ui.configlist('server', 'compressionengines')
610 config = 'server.compressionengines'
619 config = 'server.compressionengines'
611 else:
620 else:
612 # This is currently implemented mainly to facilitate testing. In most
621 # This is currently implemented mainly to facilitate testing. In most
613 # cases, the server should be in charge of choosing a compression engine
622 # cases, the server should be in charge of choosing a compression engine
614 # because a server has the most to lose from a sub-optimal choice. (e.g.
623 # because a server has the most to lose from a sub-optimal choice. (e.g.
615 # CPU DoS due to an expensive engine or a network DoS due to poor
624 # CPU DoS due to an expensive engine or a network DoS due to poor
616 # compression ratio).
625 # compression ratio).
617 configengines = ui.configlist('experimental',
626 configengines = ui.configlist('experimental',
618 'clientcompressionengines')
627 'clientcompressionengines')
619 config = 'experimental.clientcompressionengines'
628 config = 'experimental.clientcompressionengines'
620
629
621 # No explicit config. Filter out the ones that aren't supposed to be
630 # No explicit config. Filter out the ones that aren't supposed to be
622 # advertised and return default ordering.
631 # advertised and return default ordering.
623 if not configengines:
632 if not configengines:
624 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
633 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
625 return [e for e in compengines
634 return [e for e in compengines
626 if getattr(e.wireprotosupport(), attr) > 0]
635 if getattr(e.wireprotosupport(), attr) > 0]
627
636
628 # If compression engines are listed in the config, assume there is a good
637 # If compression engines are listed in the config, assume there is a good
629 # reason for it (like server operators wanting to achieve specific
638 # reason for it (like server operators wanting to achieve specific
630 # performance characteristics). So fail fast if the config references
639 # performance characteristics). So fail fast if the config references
631 # unusable compression engines.
640 # unusable compression engines.
632 validnames = set(e.name() for e in compengines)
641 validnames = set(e.name() for e in compengines)
633 invalidnames = set(e for e in configengines if e not in validnames)
642 invalidnames = set(e for e in configengines if e not in validnames)
634 if invalidnames:
643 if invalidnames:
635 raise error.Abort(_('invalid compression engine defined in %s: %s') %
644 raise error.Abort(_('invalid compression engine defined in %s: %s') %
636 (config, ', '.join(sorted(invalidnames))))
645 (config, ', '.join(sorted(invalidnames))))
637
646
638 compengines = [e for e in compengines if e.name() in configengines]
647 compengines = [e for e in compengines if e.name() in configengines]
639 compengines = sorted(compengines,
648 compengines = sorted(compengines,
640 key=lambda e: configengines.index(e.name()))
649 key=lambda e: configengines.index(e.name()))
641
650
642 if not compengines:
651 if not compengines:
643 raise error.Abort(_('%s config option does not specify any known '
652 raise error.Abort(_('%s config option does not specify any known '
644 'compression engines') % config,
653 'compression engines') % config,
645 hint=_('usable compression engines: %s') %
654 hint=_('usable compression engines: %s') %
646 ', '.sorted(validnames))
655 ', '.sorted(validnames))
647
656
648 return compengines
657 return compengines
649
658
650 # list of commands
659 # list of commands
651 commands = {}
660 commands = {}
652
661
653 def wireprotocommand(name, args=''):
662 def wireprotocommand(name, args=''):
654 """decorator for wire protocol command"""
663 """decorator for wire protocol command"""
655 def register(func):
664 def register(func):
656 commands[name] = (func, args)
665 commands[name] = (func, args)
657 return func
666 return func
658 return register
667 return register
659
668
660 @wireprotocommand('batch', 'cmds *')
669 @wireprotocommand('batch', 'cmds *')
661 def batch(repo, proto, cmds, others):
670 def batch(repo, proto, cmds, others):
662 repo = repo.filtered("served")
671 repo = repo.filtered("served")
663 res = []
672 res = []
664 for pair in cmds.split(';'):
673 for pair in cmds.split(';'):
665 op, args = pair.split(' ', 1)
674 op, args = pair.split(' ', 1)
666 vals = {}
675 vals = {}
667 for a in args.split(','):
676 for a in args.split(','):
668 if a:
677 if a:
669 n, v = a.split('=')
678 n, v = a.split('=')
670 vals[unescapearg(n)] = unescapearg(v)
679 vals[unescapearg(n)] = unescapearg(v)
671 func, spec = commands[op]
680 func, spec = commands[op]
672 if spec:
681 if spec:
673 keys = spec.split()
682 keys = spec.split()
674 data = {}
683 data = {}
675 for k in keys:
684 for k in keys:
676 if k == '*':
685 if k == '*':
677 star = {}
686 star = {}
678 for key in vals.keys():
687 for key in vals.keys():
679 if key not in keys:
688 if key not in keys:
680 star[key] = vals[key]
689 star[key] = vals[key]
681 data['*'] = star
690 data['*'] = star
682 else:
691 else:
683 data[k] = vals[k]
692 data[k] = vals[k]
684 result = func(repo, proto, *[data[k] for k in keys])
693 result = func(repo, proto, *[data[k] for k in keys])
685 else:
694 else:
686 result = func(repo, proto)
695 result = func(repo, proto)
687 if isinstance(result, ooberror):
696 if isinstance(result, ooberror):
688 return result
697 return result
689 res.append(escapearg(result))
698 res.append(escapearg(result))
690 return ';'.join(res)
699 return ';'.join(res)
691
700
692 @wireprotocommand('between', 'pairs')
701 @wireprotocommand('between', 'pairs')
693 def between(repo, proto, pairs):
702 def between(repo, proto, pairs):
694 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
703 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
695 r = []
704 r = []
696 for b in repo.between(pairs):
705 for b in repo.between(pairs):
697 r.append(encodelist(b) + "\n")
706 r.append(encodelist(b) + "\n")
698 return "".join(r)
707 return "".join(r)
699
708
700 @wireprotocommand('branchmap')
709 @wireprotocommand('branchmap')
701 def branchmap(repo, proto):
710 def branchmap(repo, proto):
702 branchmap = repo.branchmap()
711 branchmap = repo.branchmap()
703 heads = []
712 heads = []
704 for branch, nodes in branchmap.iteritems():
713 for branch, nodes in branchmap.iteritems():
705 branchname = urlreq.quote(encoding.fromlocal(branch))
714 branchname = urlreq.quote(encoding.fromlocal(branch))
706 branchnodes = encodelist(nodes)
715 branchnodes = encodelist(nodes)
707 heads.append('%s %s' % (branchname, branchnodes))
716 heads.append('%s %s' % (branchname, branchnodes))
708 return '\n'.join(heads)
717 return '\n'.join(heads)
709
718
710 @wireprotocommand('branches', 'nodes')
719 @wireprotocommand('branches', 'nodes')
711 def branches(repo, proto, nodes):
720 def branches(repo, proto, nodes):
712 nodes = decodelist(nodes)
721 nodes = decodelist(nodes)
713 r = []
722 r = []
714 for b in repo.branches(nodes):
723 for b in repo.branches(nodes):
715 r.append(encodelist(b) + "\n")
724 r.append(encodelist(b) + "\n")
716 return "".join(r)
725 return "".join(r)
717
726
718 @wireprotocommand('clonebundles', '')
727 @wireprotocommand('clonebundles', '')
719 def clonebundles(repo, proto):
728 def clonebundles(repo, proto):
720 """Server command for returning info for available bundles to seed clones.
729 """Server command for returning info for available bundles to seed clones.
721
730
722 Clients will parse this response and determine what bundle to fetch.
731 Clients will parse this response and determine what bundle to fetch.
723
732
724 Extensions may wrap this command to filter or dynamically emit data
733 Extensions may wrap this command to filter or dynamically emit data
725 depending on the request. e.g. you could advertise URLs for the closest
734 depending on the request. e.g. you could advertise URLs for the closest
726 data center given the client's IP address.
735 data center given the client's IP address.
727 """
736 """
728 return repo.vfs.tryread('clonebundles.manifest')
737 return repo.vfs.tryread('clonebundles.manifest')
729
738
730 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
739 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
731 'known', 'getbundle', 'unbundlehash', 'batch']
740 'known', 'getbundle', 'unbundlehash', 'batch']
732
741
733 def _capabilities(repo, proto):
742 def _capabilities(repo, proto):
734 """return a list of capabilities for a repo
743 """return a list of capabilities for a repo
735
744
736 This function exists to allow extensions to easily wrap capabilities
745 This function exists to allow extensions to easily wrap capabilities
737 computation
746 computation
738
747
739 - returns a lists: easy to alter
748 - returns a lists: easy to alter
740 - change done here will be propagated to both `capabilities` and `hello`
749 - change done here will be propagated to both `capabilities` and `hello`
741 command without any other action needed.
750 command without any other action needed.
742 """
751 """
743 # copy to prevent modification of the global list
752 # copy to prevent modification of the global list
744 caps = list(wireprotocaps)
753 caps = list(wireprotocaps)
745 if streamclone.allowservergeneration(repo):
754 if streamclone.allowservergeneration(repo):
746 if repo.ui.configbool('server', 'preferuncompressed'):
755 if repo.ui.configbool('server', 'preferuncompressed'):
747 caps.append('stream-preferred')
756 caps.append('stream-preferred')
748 requiredformats = repo.requirements & repo.supportedformats
757 requiredformats = repo.requirements & repo.supportedformats
749 # if our local revlogs are just revlogv1, add 'stream' cap
758 # if our local revlogs are just revlogv1, add 'stream' cap
750 if not requiredformats - {'revlogv1'}:
759 if not requiredformats - {'revlogv1'}:
751 caps.append('stream')
760 caps.append('stream')
752 # otherwise, add 'streamreqs' detailing our local revlog format
761 # otherwise, add 'streamreqs' detailing our local revlog format
753 else:
762 else:
754 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
763 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
755 if repo.ui.configbool('experimental', 'bundle2-advertise'):
764 if repo.ui.configbool('experimental', 'bundle2-advertise'):
756 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
765 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
757 caps.append('bundle2=' + urlreq.quote(capsblob))
766 caps.append('bundle2=' + urlreq.quote(capsblob))
758 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
767 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
759
768
760 if proto.name == 'http':
769 if proto.name == 'http':
761 caps.append('httpheader=%d' %
770 caps.append('httpheader=%d' %
762 repo.ui.configint('server', 'maxhttpheaderlen'))
771 repo.ui.configint('server', 'maxhttpheaderlen'))
763 if repo.ui.configbool('experimental', 'httppostargs'):
772 if repo.ui.configbool('experimental', 'httppostargs'):
764 caps.append('httppostargs')
773 caps.append('httppostargs')
765
774
766 # FUTURE advertise 0.2rx once support is implemented
775 # FUTURE advertise 0.2rx once support is implemented
767 # FUTURE advertise minrx and mintx after consulting config option
776 # FUTURE advertise minrx and mintx after consulting config option
768 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
777 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
769
778
770 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
779 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
771 if compengines:
780 if compengines:
772 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
781 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
773 for e in compengines)
782 for e in compengines)
774 caps.append('compression=%s' % comptypes)
783 caps.append('compression=%s' % comptypes)
775
784
776 return caps
785 return caps
777
786
778 # If you are writing an extension and consider wrapping this function. Wrap
787 # If you are writing an extension and consider wrapping this function. Wrap
779 # `_capabilities` instead.
788 # `_capabilities` instead.
780 @wireprotocommand('capabilities')
789 @wireprotocommand('capabilities')
781 def capabilities(repo, proto):
790 def capabilities(repo, proto):
782 return ' '.join(_capabilities(repo, proto))
791 return ' '.join(_capabilities(repo, proto))
783
792
784 @wireprotocommand('changegroup', 'roots')
793 @wireprotocommand('changegroup', 'roots')
785 def changegroup(repo, proto, roots):
794 def changegroup(repo, proto, roots):
786 nodes = decodelist(roots)
795 nodes = decodelist(roots)
787 cg = changegroupmod.changegroup(repo, nodes, 'serve')
796 cg = changegroupmod.changegroup(repo, nodes, 'serve')
788 return streamres(reader=cg, v1compressible=True)
797 return streamres(reader=cg, v1compressible=True)
789
798
790 @wireprotocommand('changegroupsubset', 'bases heads')
799 @wireprotocommand('changegroupsubset', 'bases heads')
791 def changegroupsubset(repo, proto, bases, heads):
800 def changegroupsubset(repo, proto, bases, heads):
792 bases = decodelist(bases)
801 bases = decodelist(bases)
793 heads = decodelist(heads)
802 heads = decodelist(heads)
794 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
803 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
795 return streamres(reader=cg, v1compressible=True)
804 return streamres(reader=cg, v1compressible=True)
796
805
797 @wireprotocommand('debugwireargs', 'one two *')
806 @wireprotocommand('debugwireargs', 'one two *')
798 def debugwireargs(repo, proto, one, two, others):
807 def debugwireargs(repo, proto, one, two, others):
799 # only accept optional args from the known set
808 # only accept optional args from the known set
800 opts = options('debugwireargs', ['three', 'four'], others)
809 opts = options('debugwireargs', ['three', 'four'], others)
801 return repo.debugwireargs(one, two, **opts)
810 return repo.debugwireargs(one, two, **opts)
802
811
803 @wireprotocommand('getbundle', '*')
812 @wireprotocommand('getbundle', '*')
804 def getbundle(repo, proto, others):
813 def getbundle(repo, proto, others):
805 opts = options('getbundle', gboptsmap.keys(), others)
814 opts = options('getbundle', gboptsmap.keys(), others)
806 for k, v in opts.iteritems():
815 for k, v in opts.iteritems():
807 keytype = gboptsmap[k]
816 keytype = gboptsmap[k]
808 if keytype == 'nodes':
817 if keytype == 'nodes':
809 opts[k] = decodelist(v)
818 opts[k] = decodelist(v)
810 elif keytype == 'csv':
819 elif keytype == 'csv':
811 opts[k] = list(v.split(','))
820 opts[k] = list(v.split(','))
812 elif keytype == 'scsv':
821 elif keytype == 'scsv':
813 opts[k] = set(v.split(','))
822 opts[k] = set(v.split(','))
814 elif keytype == 'boolean':
823 elif keytype == 'boolean':
815 # Client should serialize False as '0', which is a non-empty string
824 # Client should serialize False as '0', which is a non-empty string
816 # so it evaluates as a True bool.
825 # so it evaluates as a True bool.
817 if v == '0':
826 if v == '0':
818 opts[k] = False
827 opts[k] = False
819 else:
828 else:
820 opts[k] = bool(v)
829 opts[k] = bool(v)
821 elif keytype != 'plain':
830 elif keytype != 'plain':
822 raise KeyError('unknown getbundle option type %s'
831 raise KeyError('unknown getbundle option type %s'
823 % keytype)
832 % keytype)
824
833
825 if not bundle1allowed(repo, 'pull'):
834 if not bundle1allowed(repo, 'pull'):
826 if not exchange.bundle2requested(opts.get('bundlecaps')):
835 if not exchange.bundle2requested(opts.get('bundlecaps')):
827 if proto.name == 'http':
836 if proto.name == 'http':
828 return ooberror(bundle2required)
837 return ooberror(bundle2required)
829 raise error.Abort(bundle2requiredmain,
838 raise error.Abort(bundle2requiredmain,
830 hint=bundle2requiredhint)
839 hint=bundle2requiredhint)
831
840
832 try:
841 try:
833 if repo.ui.configbool('server', 'disablefullbundle'):
842 if repo.ui.configbool('server', 'disablefullbundle'):
834 # Check to see if this is a full clone.
843 # Check to see if this is a full clone.
835 clheads = set(repo.changelog.heads())
844 clheads = set(repo.changelog.heads())
836 heads = set(opts.get('heads', set()))
845 heads = set(opts.get('heads', set()))
837 common = set(opts.get('common', set()))
846 common = set(opts.get('common', set()))
838 common.discard(nullid)
847 common.discard(nullid)
839 if not common and clheads == heads:
848 if not common and clheads == heads:
840 raise error.Abort(
849 raise error.Abort(
841 _('server has pull-based clones disabled'),
850 _('server has pull-based clones disabled'),
842 hint=_('remove --pull if specified or upgrade Mercurial'))
851 hint=_('remove --pull if specified or upgrade Mercurial'))
843
852
844 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
853 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
845 except error.Abort as exc:
854 except error.Abort as exc:
846 # cleanly forward Abort error to the client
855 # cleanly forward Abort error to the client
847 if not exchange.bundle2requested(opts.get('bundlecaps')):
856 if not exchange.bundle2requested(opts.get('bundlecaps')):
848 if proto.name == 'http':
857 if proto.name == 'http':
849 return ooberror(str(exc) + '\n')
858 return ooberror(str(exc) + '\n')
850 raise # cannot do better for bundle1 + ssh
859 raise # cannot do better for bundle1 + ssh
851 # bundle2 request expect a bundle2 reply
860 # bundle2 request expect a bundle2 reply
852 bundler = bundle2.bundle20(repo.ui)
861 bundler = bundle2.bundle20(repo.ui)
853 manargs = [('message', str(exc))]
862 manargs = [('message', str(exc))]
854 advargs = []
863 advargs = []
855 if exc.hint is not None:
864 if exc.hint is not None:
856 advargs.append(('hint', exc.hint))
865 advargs.append(('hint', exc.hint))
857 bundler.addpart(bundle2.bundlepart('error:abort',
866 bundler.addpart(bundle2.bundlepart('error:abort',
858 manargs, advargs))
867 manargs, advargs))
859 return streamres(gen=bundler.getchunks(), v1compressible=True)
868 return streamres(gen=bundler.getchunks(), v1compressible=True)
860 return streamres(gen=chunks, v1compressible=True)
869 return streamres(gen=chunks, v1compressible=True)
861
870
862 @wireprotocommand('heads')
871 @wireprotocommand('heads')
863 def heads(repo, proto):
872 def heads(repo, proto):
864 h = repo.heads()
873 h = repo.heads()
865 return encodelist(h) + "\n"
874 return encodelist(h) + "\n"
866
875
867 @wireprotocommand('hello')
876 @wireprotocommand('hello')
868 def hello(repo, proto):
877 def hello(repo, proto):
869 '''the hello command returns a set of lines describing various
878 '''the hello command returns a set of lines describing various
870 interesting things about the server, in an RFC822-like format.
879 interesting things about the server, in an RFC822-like format.
871 Currently the only one defined is "capabilities", which
880 Currently the only one defined is "capabilities", which
872 consists of a line in the form:
881 consists of a line in the form:
873
882
874 capabilities: space separated list of tokens
883 capabilities: space separated list of tokens
875 '''
884 '''
876 return "capabilities: %s\n" % (capabilities(repo, proto))
885 return "capabilities: %s\n" % (capabilities(repo, proto))
877
886
878 @wireprotocommand('listkeys', 'namespace')
887 @wireprotocommand('listkeys', 'namespace')
879 def listkeys(repo, proto, namespace):
888 def listkeys(repo, proto, namespace):
880 d = repo.listkeys(encoding.tolocal(namespace)).items()
889 d = repo.listkeys(encoding.tolocal(namespace)).items()
881 return pushkeymod.encodekeys(d)
890 return pushkeymod.encodekeys(d)
882
891
883 @wireprotocommand('lookup', 'key')
892 @wireprotocommand('lookup', 'key')
884 def lookup(repo, proto, key):
893 def lookup(repo, proto, key):
885 try:
894 try:
886 k = encoding.tolocal(key)
895 k = encoding.tolocal(key)
887 c = repo[k]
896 c = repo[k]
888 r = c.hex()
897 r = c.hex()
889 success = 1
898 success = 1
890 except Exception as inst:
899 except Exception as inst:
891 r = str(inst)
900 r = str(inst)
892 success = 0
901 success = 0
893 return "%s %s\n" % (success, r)
902 return "%s %s\n" % (success, r)
894
903
895 @wireprotocommand('known', 'nodes *')
904 @wireprotocommand('known', 'nodes *')
896 def known(repo, proto, nodes, others):
905 def known(repo, proto, nodes, others):
897 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
906 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
898
907
899 @wireprotocommand('pushkey', 'namespace key old new')
908 @wireprotocommand('pushkey', 'namespace key old new')
900 def pushkey(repo, proto, namespace, key, old, new):
909 def pushkey(repo, proto, namespace, key, old, new):
901 # compatibility with pre-1.8 clients which were accidentally
910 # compatibility with pre-1.8 clients which were accidentally
902 # sending raw binary nodes rather than utf-8-encoded hex
911 # sending raw binary nodes rather than utf-8-encoded hex
903 if len(new) == 20 and util.escapestr(new) != new:
912 if len(new) == 20 and util.escapestr(new) != new:
904 # looks like it could be a binary node
913 # looks like it could be a binary node
905 try:
914 try:
906 new.decode('utf-8')
915 new.decode('utf-8')
907 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
916 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
908 except UnicodeDecodeError:
917 except UnicodeDecodeError:
909 pass # binary, leave unmodified
918 pass # binary, leave unmodified
910 else:
919 else:
911 new = encoding.tolocal(new) # normal path
920 new = encoding.tolocal(new) # normal path
912
921
913 if util.safehasattr(proto, 'restore'):
922 if util.safehasattr(proto, 'restore'):
914
923
915 proto.redirect()
924 proto.redirect()
916
925
917 try:
926 try:
918 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
919 encoding.tolocal(old), new) or False
928 encoding.tolocal(old), new) or False
920 except error.Abort:
929 except error.Abort:
921 r = False
930 r = False
922
931
923 output = proto.restore()
932 output = proto.restore()
924
933
925 return '%s\n%s' % (int(r), output)
934 return '%s\n%s' % (int(r), output)
926
935
927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
936 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
928 encoding.tolocal(old), new)
937 encoding.tolocal(old), new)
929 return '%s\n' % int(r)
938 return '%s\n' % int(r)
930
939
931 @wireprotocommand('stream_out')
940 @wireprotocommand('stream_out')
932 def stream(repo, proto):
941 def stream(repo, proto):
933 '''If the server supports streaming clone, it advertises the "stream"
942 '''If the server supports streaming clone, it advertises the "stream"
934 capability with a value representing the version and flags of the repo
943 capability with a value representing the version and flags of the repo
935 it is serving. Client checks to see if it understands the format.
944 it is serving. Client checks to see if it understands the format.
936 '''
945 '''
937 if not streamclone.allowservergeneration(repo):
946 if not streamclone.allowservergeneration(repo):
938 return '1\n'
947 return '1\n'
939
948
940 def getstream(it):
949 def getstream(it):
941 yield '0\n'
950 yield '0\n'
942 for chunk in it:
951 for chunk in it:
943 yield chunk
952 yield chunk
944
953
945 try:
954 try:
946 # LockError may be raised before the first result is yielded. Don't
955 # LockError may be raised before the first result is yielded. Don't
947 # emit output until we're sure we got the lock successfully.
956 # emit output until we're sure we got the lock successfully.
948 it = streamclone.generatev1wireproto(repo)
957 it = streamclone.generatev1wireproto(repo)
949 return streamres(gen=getstream(it))
958 return streamres(gen=getstream(it))
950 except error.LockError:
959 except error.LockError:
951 return '2\n'
960 return '2\n'
952
961
953 @wireprotocommand('unbundle', 'heads')
962 @wireprotocommand('unbundle', 'heads')
954 def unbundle(repo, proto, heads):
963 def unbundle(repo, proto, heads):
955 their_heads = decodelist(heads)
964 their_heads = decodelist(heads)
956
965
957 try:
966 try:
958 proto.redirect()
967 proto.redirect()
959
968
960 exchange.check_heads(repo, their_heads, 'preparing changes')
969 exchange.check_heads(repo, their_heads, 'preparing changes')
961
970
962 # write bundle data to temporary file because it can be big
971 # write bundle data to temporary file because it can be big
963 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
972 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
964 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
973 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
965 r = 0
974 r = 0
966 try:
975 try:
967 proto.getfile(fp)
976 proto.getfile(fp)
968 fp.seek(0)
977 fp.seek(0)
969 gen = exchange.readbundle(repo.ui, fp, None)
978 gen = exchange.readbundle(repo.ui, fp, None)
970 if (isinstance(gen, changegroupmod.cg1unpacker)
979 if (isinstance(gen, changegroupmod.cg1unpacker)
971 and not bundle1allowed(repo, 'push')):
980 and not bundle1allowed(repo, 'push')):
972 if proto.name == 'http':
981 if proto.name == 'http':
973 # need to special case http because stderr do not get to
982 # need to special case http because stderr do not get to
974 # the http client on failed push so we need to abuse some
983 # the http client on failed push so we need to abuse some
975 # other error type to make sure the message get to the
984 # other error type to make sure the message get to the
976 # user.
985 # user.
977 return ooberror(bundle2required)
986 return ooberror(bundle2required)
978 raise error.Abort(bundle2requiredmain,
987 raise error.Abort(bundle2requiredmain,
979 hint=bundle2requiredhint)
988 hint=bundle2requiredhint)
980
989
981 r = exchange.unbundle(repo, gen, their_heads, 'serve',
990 r = exchange.unbundle(repo, gen, their_heads, 'serve',
982 proto._client())
991 proto._client())
983 if util.safehasattr(r, 'addpart'):
992 if util.safehasattr(r, 'addpart'):
984 # The return looks streamable, we are in the bundle2 case and
993 # The return looks streamable, we are in the bundle2 case and
985 # should return a stream.
994 # should return a stream.
986 return streamres(gen=r.getchunks())
995 return streamres(gen=r.getchunks())
987 return pushres(r)
996 return pushres(r)
988
997
989 finally:
998 finally:
990 fp.close()
999 fp.close()
991 os.unlink(tempname)
1000 os.unlink(tempname)
992
1001
993 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1002 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
994 # handle non-bundle2 case first
1003 # handle non-bundle2 case first
995 if not getattr(exc, 'duringunbundle2', False):
1004 if not getattr(exc, 'duringunbundle2', False):
996 try:
1005 try:
997 raise
1006 raise
998 except error.Abort:
1007 except error.Abort:
999 # The old code we moved used util.stderr directly.
1008 # The old code we moved used util.stderr directly.
1000 # We did not change it to minimise code change.
1009 # We did not change it to minimise code change.
1001 # This need to be moved to something proper.
1010 # This need to be moved to something proper.
1002 # Feel free to do it.
1011 # Feel free to do it.
1003 util.stderr.write("abort: %s\n" % exc)
1012 util.stderr.write("abort: %s\n" % exc)
1004 if exc.hint is not None:
1013 if exc.hint is not None:
1005 util.stderr.write("(%s)\n" % exc.hint)
1014 util.stderr.write("(%s)\n" % exc.hint)
1006 return pushres(0)
1015 return pushres(0)
1007 except error.PushRaced:
1016 except error.PushRaced:
1008 return pusherr(str(exc))
1017 return pusherr(str(exc))
1009
1018
1010 bundler = bundle2.bundle20(repo.ui)
1019 bundler = bundle2.bundle20(repo.ui)
1011 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1020 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1012 bundler.addpart(out)
1021 bundler.addpart(out)
1013 try:
1022 try:
1014 try:
1023 try:
1015 raise
1024 raise
1016 except error.PushkeyFailed as exc:
1025 except error.PushkeyFailed as exc:
1017 # check client caps
1026 # check client caps
1018 remotecaps = getattr(exc, '_replycaps', None)
1027 remotecaps = getattr(exc, '_replycaps', None)
1019 if (remotecaps is not None
1028 if (remotecaps is not None
1020 and 'pushkey' not in remotecaps.get('error', ())):
1029 and 'pushkey' not in remotecaps.get('error', ())):
1021 # no support remote side, fallback to Abort handler.
1030 # no support remote side, fallback to Abort handler.
1022 raise
1031 raise
1023 part = bundler.newpart('error:pushkey')
1032 part = bundler.newpart('error:pushkey')
1024 part.addparam('in-reply-to', exc.partid)
1033 part.addparam('in-reply-to', exc.partid)
1025 if exc.namespace is not None:
1034 if exc.namespace is not None:
1026 part.addparam('namespace', exc.namespace, mandatory=False)
1035 part.addparam('namespace', exc.namespace, mandatory=False)
1027 if exc.key is not None:
1036 if exc.key is not None:
1028 part.addparam('key', exc.key, mandatory=False)
1037 part.addparam('key', exc.key, mandatory=False)
1029 if exc.new is not None:
1038 if exc.new is not None:
1030 part.addparam('new', exc.new, mandatory=False)
1039 part.addparam('new', exc.new, mandatory=False)
1031 if exc.old is not None:
1040 if exc.old is not None:
1032 part.addparam('old', exc.old, mandatory=False)
1041 part.addparam('old', exc.old, mandatory=False)
1033 if exc.ret is not None:
1042 if exc.ret is not None:
1034 part.addparam('ret', exc.ret, mandatory=False)
1043 part.addparam('ret', exc.ret, mandatory=False)
1035 except error.BundleValueError as exc:
1044 except error.BundleValueError as exc:
1036 errpart = bundler.newpart('error:unsupportedcontent')
1045 errpart = bundler.newpart('error:unsupportedcontent')
1037 if exc.parttype is not None:
1046 if exc.parttype is not None:
1038 errpart.addparam('parttype', exc.parttype)
1047 errpart.addparam('parttype', exc.parttype)
1039 if exc.params:
1048 if exc.params:
1040 errpart.addparam('params', '\0'.join(exc.params))
1049 errpart.addparam('params', '\0'.join(exc.params))
1041 except error.Abort as exc:
1050 except error.Abort as exc:
1042 manargs = [('message', str(exc))]
1051 manargs = [('message', str(exc))]
1043 advargs = []
1052 advargs = []
1044 if exc.hint is not None:
1053 if exc.hint is not None:
1045 advargs.append(('hint', exc.hint))
1054 advargs.append(('hint', exc.hint))
1046 bundler.addpart(bundle2.bundlepart('error:abort',
1055 bundler.addpart(bundle2.bundlepart('error:abort',
1047 manargs, advargs))
1056 manargs, advargs))
1048 except error.PushRaced as exc:
1057 except error.PushRaced as exc:
1049 bundler.newpart('error:pushraced', [('message', str(exc))])
1058 bundler.newpart('error:pushraced', [('message', str(exc))])
1050 return streamres(gen=bundler.getchunks())
1059 return streamres(gen=bundler.getchunks())
@@ -1,24 +1,24 b''
1 # Disable the $CAP wire protocol capability.
1 # Disable the $CAP wire protocol capability.
2
2
3 if test -z "$CAP"
3 if test -z "$CAP"
4 then
4 then
5 echo "CAP environment variable not set."
5 echo "CAP environment variable not set."
6 fi
6 fi
7
7
8 cat > notcapable-$CAP.py << EOF
8 cat > notcapable-$CAP.py << EOF
9 from mercurial import extensions, peer, localrepo
9 from mercurial import extensions, localrepo, repository
10 def extsetup():
10 def extsetup():
11 extensions.wrapfunction(peer.peerrepository, 'capable', wrapcapable)
11 extensions.wrapfunction(repository.peer, 'capable', wrapcapable)
12 extensions.wrapfunction(localrepo.localrepository, 'peer', wrappeer)
12 extensions.wrapfunction(localrepo.localrepository, 'peer', wrappeer)
13 def wrapcapable(orig, self, name, *args, **kwargs):
13 def wrapcapable(orig, self, name, *args, **kwargs):
14 if name in '$CAP'.split(' '):
14 if name in '$CAP'.split(' '):
15 return False
15 return False
16 return orig(self, name, *args, **kwargs)
16 return orig(self, name, *args, **kwargs)
17 def wrappeer(orig, self):
17 def wrappeer(orig, self):
18 # Since we're disabling some newer features, we need to make sure local
18 # Since we're disabling some newer features, we need to make sure local
19 # repos add in the legacy features again.
19 # repos add in the legacy features again.
20 return localrepo.locallegacypeer(self)
20 return localrepo.locallegacypeer(self)
21 EOF
21 EOF
22
22
23 echo '[extensions]' >> $HGRCPATH
23 echo '[extensions]' >> $HGRCPATH
24 echo "notcapable-$CAP = `pwd`/notcapable-$CAP.py" >> $HGRCPATH
24 echo "notcapable-$CAP = `pwd`/notcapable-$CAP.py" >> $HGRCPATH
@@ -1,61 +1,80 b''
1 from __future__ import absolute_import, print_function
1 from __future__ import absolute_import, print_function
2
2
3 from mercurial import (
3 from mercurial import (
4 util,
4 util,
5 wireproto,
5 wireproto,
6 )
6 )
7 stringio = util.stringio
7 stringio = util.stringio
8
8
9 class proto(object):
9 class proto(object):
10 def __init__(self, args):
10 def __init__(self, args):
11 self.args = args
11 self.args = args
12 def getargs(self, spec):
12 def getargs(self, spec):
13 args = self.args
13 args = self.args
14 args.setdefault('*', {})
14 args.setdefault('*', {})
15 names = spec.split()
15 names = spec.split()
16 return [args[n] for n in names]
16 return [args[n] for n in names]
17
17
18 class clientpeer(wireproto.wirepeer):
18 class clientpeer(wireproto.wirepeer):
19 def __init__(self, serverrepo):
19 def __init__(self, serverrepo):
20 self.serverrepo = serverrepo
20 self.serverrepo = serverrepo
21
21
22 def _capabilities(self):
22 @property
23 def ui(self):
24 return self.serverrepo.ui
25
26 def url(self):
27 return 'test'
28
29 def local(self):
30 return None
31
32 def peer(self):
33 return self
34
35 def canpush(self):
36 return True
37
38 def close(self):
39 pass
40
41 def capabilities(self):
23 return ['batch']
42 return ['batch']
24
43
25 def _call(self, cmd, **args):
44 def _call(self, cmd, **args):
26 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
45 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
27
46
28 def _callstream(self, cmd, **args):
47 def _callstream(self, cmd, **args):
29 return stringio(self._call(cmd, **args))
48 return stringio(self._call(cmd, **args))
30
49
31 @wireproto.batchable
50 @wireproto.batchable
32 def greet(self, name):
51 def greet(self, name):
33 f = wireproto.future()
52 f = wireproto.future()
34 yield {'name': mangle(name)}, f
53 yield {'name': mangle(name)}, f
35 yield unmangle(f.value)
54 yield unmangle(f.value)
36
55
37 class serverrepo(object):
56 class serverrepo(object):
38 def greet(self, name):
57 def greet(self, name):
39 return "Hello, " + name
58 return "Hello, " + name
40
59
41 def filtered(self, name):
60 def filtered(self, name):
42 return self
61 return self
43
62
44 def mangle(s):
63 def mangle(s):
45 return ''.join(chr(ord(c) + 1) for c in s)
64 return ''.join(chr(ord(c) + 1) for c in s)
46 def unmangle(s):
65 def unmangle(s):
47 return ''.join(chr(ord(c) - 1) for c in s)
66 return ''.join(chr(ord(c) - 1) for c in s)
48
67
49 def greet(repo, proto, name):
68 def greet(repo, proto, name):
50 return mangle(repo.greet(unmangle(name)))
69 return mangle(repo.greet(unmangle(name)))
51
70
52 wireproto.commands['greet'] = (greet, 'name',)
71 wireproto.commands['greet'] = (greet, 'name',)
53
72
54 srv = serverrepo()
73 srv = serverrepo()
55 clt = clientpeer(srv)
74 clt = clientpeer(srv)
56
75
57 print(clt.greet("Foobar"))
76 print(clt.greet("Foobar"))
58 b = clt.iterbatch()
77 b = clt.iterbatch()
59 map(b.greet, ('Fo, =;:<o', 'Bar'))
78 map(b.greet, ('Fo, =;:<o', 'Bar'))
60 b.submit()
79 b.submit()
61 print([r for r in b.results()])
80 print([r for r in b.results()])
General Comments 0
You need to be logged in to leave comments. Login now