##// END OF EJS Templates
sshpeer: rename sshpeer class to sshv1peer (API)...
Gregory Szorc -
r35995:625038cb default
parent child Browse files
Show More
@@ -1,205 +1,205
1 1 # Copyright 2009-2010 Gregory P. Ward
2 2 # Copyright 2009-2010 Intelerad Medical Systems Incorporated
3 3 # Copyright 2010-2011 Fog Creek Software
4 4 # Copyright 2010-2011 Unity Technologies
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 '''setup for largefiles extension: uisetup'''
10 10 from __future__ import absolute_import
11 11
12 12 from mercurial.i18n import _
13 13
14 14 from mercurial.hgweb import (
15 15 hgweb_mod,
16 16 webcommands,
17 17 )
18 18
19 19 from mercurial import (
20 20 archival,
21 21 cmdutil,
22 22 commands,
23 23 copies,
24 24 exchange,
25 25 extensions,
26 26 filemerge,
27 27 hg,
28 28 httppeer,
29 29 merge,
30 30 scmutil,
31 31 sshpeer,
32 32 subrepo,
33 33 upgrade,
34 34 url,
35 35 wireproto,
36 36 )
37 37
38 38 from . import (
39 39 overrides,
40 40 proto,
41 41 )
42 42
43 43 def uisetup(ui):
44 44 # Disable auto-status for some commands which assume that all
45 45 # files in the result are under Mercurial's control
46 46
47 47 entry = extensions.wrapcommand(commands.table, 'add',
48 48 overrides.overrideadd)
49 49 addopt = [('', 'large', None, _('add as largefile')),
50 50 ('', 'normal', None, _('add as normal file')),
51 51 ('', 'lfsize', '', _('add all files above this size '
52 52 '(in megabytes) as largefiles '
53 53 '(default: 10)'))]
54 54 entry[1].extend(addopt)
55 55
56 56 # The scmutil function is called both by the (trivial) addremove command,
57 57 # and in the process of handling commit -A (issue3542)
58 58 extensions.wrapfunction(scmutil, 'addremove', overrides.scmutiladdremove)
59 59 extensions.wrapfunction(cmdutil, 'add', overrides.cmdutiladd)
60 60 extensions.wrapfunction(cmdutil, 'remove', overrides.cmdutilremove)
61 61 extensions.wrapfunction(cmdutil, 'forget', overrides.cmdutilforget)
62 62
63 63 extensions.wrapfunction(copies, 'pathcopies', overrides.copiespathcopies)
64 64
65 65 extensions.wrapfunction(upgrade, 'preservedrequirements',
66 66 overrides.upgraderequirements)
67 67
68 68 extensions.wrapfunction(upgrade, 'supporteddestrequirements',
69 69 overrides.upgraderequirements)
70 70
71 71 # Subrepos call status function
72 72 entry = extensions.wrapcommand(commands.table, 'status',
73 73 overrides.overridestatus)
74 74 extensions.wrapfunction(subrepo.hgsubrepo, 'status',
75 75 overrides.overridestatusfn)
76 76
77 77 entry = extensions.wrapcommand(commands.table, 'log',
78 78 overrides.overridelog)
79 79 entry = extensions.wrapcommand(commands.table, 'rollback',
80 80 overrides.overriderollback)
81 81 entry = extensions.wrapcommand(commands.table, 'verify',
82 82 overrides.overrideverify)
83 83
84 84 verifyopt = [('', 'large', None,
85 85 _('verify that all largefiles in current revision exists')),
86 86 ('', 'lfa', None,
87 87 _('verify largefiles in all revisions, not just current')),
88 88 ('', 'lfc', None,
89 89 _('verify local largefile contents, not just existence'))]
90 90 entry[1].extend(verifyopt)
91 91
92 92 entry = extensions.wrapcommand(commands.table, 'debugstate',
93 93 overrides.overridedebugstate)
94 94 debugstateopt = [('', 'large', None, _('display largefiles dirstate'))]
95 95 entry[1].extend(debugstateopt)
96 96
97 97 outgoing = lambda orgfunc, *arg, **kwargs: orgfunc(*arg, **kwargs)
98 98 entry = extensions.wrapcommand(commands.table, 'outgoing', outgoing)
99 99 outgoingopt = [('', 'large', None, _('display outgoing largefiles'))]
100 100 entry[1].extend(outgoingopt)
101 101 cmdutil.outgoinghooks.add('largefiles', overrides.outgoinghook)
102 102 entry = extensions.wrapcommand(commands.table, 'summary',
103 103 overrides.overridesummary)
104 104 summaryopt = [('', 'large', None, _('display outgoing largefiles'))]
105 105 entry[1].extend(summaryopt)
106 106 cmdutil.summaryremotehooks.add('largefiles', overrides.summaryremotehook)
107 107
108 108 entry = extensions.wrapcommand(commands.table, 'pull',
109 109 overrides.overridepull)
110 110 pullopt = [('', 'all-largefiles', None,
111 111 _('download all pulled versions of largefiles (DEPRECATED)')),
112 112 ('', 'lfrev', [],
113 113 _('download largefiles for these revisions'), _('REV'))]
114 114 entry[1].extend(pullopt)
115 115
116 116 entry = extensions.wrapcommand(commands.table, 'push',
117 117 overrides.overridepush)
118 118 pushopt = [('', 'lfrev', [],
119 119 _('upload largefiles for these revisions'), _('REV'))]
120 120 entry[1].extend(pushopt)
121 121 extensions.wrapfunction(exchange, 'pushoperation',
122 122 overrides.exchangepushoperation)
123 123
124 124 entry = extensions.wrapcommand(commands.table, 'clone',
125 125 overrides.overrideclone)
126 126 cloneopt = [('', 'all-largefiles', None,
127 127 _('download all versions of all largefiles'))]
128 128 entry[1].extend(cloneopt)
129 129 extensions.wrapfunction(hg, 'clone', overrides.hgclone)
130 130 extensions.wrapfunction(hg, 'postshare', overrides.hgpostshare)
131 131
132 132 entry = extensions.wrapcommand(commands.table, 'cat',
133 133 overrides.overridecat)
134 134 extensions.wrapfunction(merge, '_checkunknownfile',
135 135 overrides.overridecheckunknownfile)
136 136 extensions.wrapfunction(merge, 'calculateupdates',
137 137 overrides.overridecalculateupdates)
138 138 extensions.wrapfunction(merge, 'recordupdates',
139 139 overrides.mergerecordupdates)
140 140 extensions.wrapfunction(merge, 'update', overrides.mergeupdate)
141 141 extensions.wrapfunction(filemerge, '_filemerge',
142 142 overrides.overridefilemerge)
143 143 extensions.wrapfunction(cmdutil, 'copy', overrides.overridecopy)
144 144
145 145 # Summary calls dirty on the subrepos
146 146 extensions.wrapfunction(subrepo.hgsubrepo, 'dirty', overrides.overridedirty)
147 147
148 148 extensions.wrapfunction(cmdutil, 'revert', overrides.overriderevert)
149 149
150 150 extensions.wrapcommand(commands.table, 'archive',
151 151 overrides.overridearchivecmd)
152 152 extensions.wrapfunction(archival, 'archive', overrides.overridearchive)
153 153 extensions.wrapfunction(subrepo.hgsubrepo, 'archive',
154 154 overrides.hgsubrepoarchive)
155 155 extensions.wrapfunction(webcommands, 'archive', overrides.hgwebarchive)
156 156 extensions.wrapfunction(cmdutil, 'bailifchanged',
157 157 overrides.overridebailifchanged)
158 158
159 159 extensions.wrapfunction(cmdutil, 'postcommitstatus',
160 160 overrides.postcommitstatus)
161 161 extensions.wrapfunction(scmutil, 'marktouched',
162 162 overrides.scmutilmarktouched)
163 163
164 164 extensions.wrapfunction(url, 'open',
165 165 overrides.openlargefile)
166 166
167 167 # create the new wireproto commands ...
168 168 wireproto.commands['putlfile'] = (proto.putlfile, 'sha')
169 169 wireproto.commands['getlfile'] = (proto.getlfile, 'sha')
170 170 wireproto.commands['statlfile'] = (proto.statlfile, 'sha')
171 171
172 172 # ... and wrap some existing ones
173 173 wireproto.commands['heads'] = (proto.heads, '')
174 174 wireproto.commands['lheads'] = (wireproto.heads, '')
175 175
176 176 # make putlfile behave the same as push and {get,stat}lfile behave
177 177 # the same as pull w.r.t. permissions checks
178 178 hgweb_mod.perms['putlfile'] = 'push'
179 179 hgweb_mod.perms['getlfile'] = 'pull'
180 180 hgweb_mod.perms['statlfile'] = 'pull'
181 181
182 182 extensions.wrapfunction(webcommands, 'decodepath', overrides.decodepath)
183 183
184 184 extensions.wrapfunction(wireproto, '_capabilities', proto._capabilities)
185 185
186 186 # can't do this in reposetup because it needs to have happened before
187 187 # wirerepo.__init__ is called
188 proto.ssholdcallstream = sshpeer.sshpeer._callstream
188 proto.ssholdcallstream = sshpeer.sshv1peer._callstream
189 189 proto.httpoldcallstream = httppeer.httppeer._callstream
190 sshpeer.sshpeer._callstream = proto.sshrepocallstream
190 sshpeer.sshv1peer._callstream = proto.sshrepocallstream
191 191 httppeer.httppeer._callstream = proto.httprepocallstream
192 192
193 193 # override some extensions' stuff as well
194 194 for name, module in extensions.extensions():
195 195 if name == 'purge':
196 196 extensions.wrapcommand(getattr(module, 'cmdtable'), 'purge',
197 197 overrides.overridepurge)
198 198 if name == 'rebase':
199 199 extensions.wrapcommand(getattr(module, 'cmdtable'), 'rebase',
200 200 overrides.overriderebase)
201 201 extensions.wrapfunction(module, 'rebase',
202 202 overrides.overriderebase)
203 203 if name == 'transplant':
204 204 extensions.wrapcommand(getattr(module, 'cmdtable'), 'transplant',
205 205 overrides.overridetransplant)
@@ -1,540 +1,540
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11 import uuid
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 error,
16 16 pycompat,
17 17 util,
18 18 wireproto,
19 19 wireprotoserver,
20 20 )
21 21
22 22 def _serverquote(s):
23 23 """quote a string for the remote shell ... which we assume is sh"""
24 24 if not s:
25 25 return s
26 26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 27 return s
28 28 return "'%s'" % s.replace("'", "'\\''")
29 29
30 30 def _forwardoutput(ui, pipe):
31 31 """display all data currently available on pipe as remote output.
32 32
33 33 This is non blocking."""
34 34 s = util.readpipe(pipe)
35 35 if s:
36 36 for l in s.splitlines():
37 37 ui.status(_("remote: "), l, '\n')
38 38
39 39 class doublepipe(object):
40 40 """Operate a side-channel pipe in addition of a main one
41 41
42 42 The side-channel pipe contains server output to be forwarded to the user
43 43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 44 content of the "side" pipe is properly processed while we wait for blocking
45 45 call on the "main" pipe.
46 46
47 47 If large amounts of data are read from "main", the forward will cease after
48 48 the first bytes start to appear. This simplifies the implementation
49 49 without affecting actual output of sshpeer too much as we rarely issue
50 50 large read for data not yet emitted by the server.
51 51
52 52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 53 that handle all the os specific bits. This class lives in this module
54 54 because it focus on behavior specific to the ssh protocol."""
55 55
56 56 def __init__(self, ui, main, side):
57 57 self._ui = ui
58 58 self._main = main
59 59 self._side = side
60 60
61 61 def _wait(self):
62 62 """wait until some data are available on main or side
63 63
64 64 return a pair of boolean (ismainready, issideready)
65 65
66 66 (This will only wait for data if the setup is supported by `util.poll`)
67 67 """
68 68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
69 69 return (True, True) # main has data, assume side is worth poking at.
70 70 fds = [self._main.fileno(), self._side.fileno()]
71 71 try:
72 72 act = util.poll(fds)
73 73 except NotImplementedError:
74 74 # non supported yet case, assume all have data.
75 75 act = fds
76 76 return (self._main.fileno() in act, self._side.fileno() in act)
77 77
78 78 def write(self, data):
79 79 return self._call('write', data)
80 80
81 81 def read(self, size):
82 82 r = self._call('read', size)
83 83 if size != 0 and not r:
84 84 # We've observed a condition that indicates the
85 85 # stdout closed unexpectedly. Check stderr one
86 86 # more time and snag anything that's there before
87 87 # letting anyone know the main part of the pipe
88 88 # closed prematurely.
89 89 _forwardoutput(self._ui, self._side)
90 90 return r
91 91
92 92 def readline(self):
93 93 return self._call('readline')
94 94
95 95 def _call(self, methname, data=None):
96 96 """call <methname> on "main", forward output of "side" while blocking
97 97 """
98 98 # data can be '' or 0
99 99 if (data is not None and not data) or self._main.closed:
100 100 _forwardoutput(self._ui, self._side)
101 101 return ''
102 102 while True:
103 103 mainready, sideready = self._wait()
104 104 if sideready:
105 105 _forwardoutput(self._ui, self._side)
106 106 if mainready:
107 107 meth = getattr(self._main, methname)
108 108 if data is None:
109 109 return meth()
110 110 else:
111 111 return meth(data)
112 112
113 113 def close(self):
114 114 return self._main.close()
115 115
116 116 def flush(self):
117 117 return self._main.flush()
118 118
119 119 def _cleanuppipes(ui, pipei, pipeo, pipee):
120 120 """Clean up pipes used by an SSH connection."""
121 121 if pipeo:
122 122 pipeo.close()
123 123 if pipei:
124 124 pipei.close()
125 125
126 126 if pipee:
127 127 # Try to read from the err descriptor until EOF.
128 128 try:
129 129 for l in pipee:
130 130 ui.status(_('remote: '), l)
131 131 except (IOError, ValueError):
132 132 pass
133 133
134 134 pipee.close()
135 135
136 136 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
137 137 """Create an SSH connection to a server.
138 138
139 139 Returns a tuple of (process, stdin, stdout, stderr) for the
140 140 spawned process.
141 141 """
142 142 cmd = '%s %s %s' % (
143 143 sshcmd,
144 144 args,
145 145 util.shellquote('%s -R %s serve --stdio' % (
146 146 _serverquote(remotecmd), _serverquote(path))))
147 147
148 148 ui.debug('running %s\n' % cmd)
149 149 cmd = util.quotecommand(cmd)
150 150
151 151 # no buffer allow the use of 'select'
152 152 # feel free to remove buffering and select usage when we ultimately
153 153 # move to threading.
154 154 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
155 155
156 156 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
157 157 stdin = doublepipe(ui, stdin, stderr)
158 158
159 159 return proc, stdin, stdout, stderr
160 160
161 161 def _performhandshake(ui, stdin, stdout, stderr):
162 162 def badresponse():
163 163 msg = _('no suitable response from remote hg')
164 164 hint = ui.config('ui', 'ssherrorhint')
165 165 raise error.RepoError(msg, hint=hint)
166 166
167 167 # The handshake consists of sending wire protocol commands in reverse
168 168 # order of protocol implementation and then sniffing for a response
169 169 # to one of them.
170 170 #
171 171 # Those commands (from oldest to newest) are:
172 172 #
173 173 # ``between``
174 174 # Asks for the set of revisions between a pair of revisions. Command
175 175 # present in all Mercurial server implementations.
176 176 #
177 177 # ``hello``
178 178 # Instructs the server to advertise its capabilities. Introduced in
179 179 # Mercurial 0.9.1.
180 180 #
181 181 # ``upgrade``
182 182 # Requests upgrade from default transport protocol version 1 to
183 183 # a newer version. Introduced in Mercurial 4.6 as an experimental
184 184 # feature.
185 185 #
186 186 # The ``between`` command is issued with a request for the null
187 187 # range. If the remote is a Mercurial server, this request will
188 188 # generate a specific response: ``1\n\n``. This represents the
189 189 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
190 190 # in the output stream and know this is the response to ``between``
191 191 # and we're at the end of our handshake reply.
192 192 #
193 193 # The response to the ``hello`` command will be a line with the
194 194 # length of the value returned by that command followed by that
195 195 # value. If the server doesn't support ``hello`` (which should be
196 196 # rare), that line will be ``0\n``. Otherwise, the value will contain
197 197 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
198 198 # the capabilities of the server.
199 199 #
200 200 # The ``upgrade`` command isn't really a command in the traditional
201 201 # sense of version 1 of the transport because it isn't using the
202 202 # proper mechanism for formatting insteads: instead, it just encodes
203 203 # arguments on the line, delimited by spaces.
204 204 #
205 205 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
206 206 # If the server doesn't support protocol upgrades, it will reply to
207 207 # this line with ``0\n``. Otherwise, it emits an
208 208 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
209 209 # Content immediately following this line describes additional
210 210 # protocol and server state.
211 211 #
212 212 # In addition to the responses to our command requests, the server
213 213 # may emit "banner" output on stdout. SSH servers are allowed to
214 214 # print messages to stdout on login. Issuing commands on connection
215 215 # allows us to flush this banner output from the server by scanning
216 216 # for output to our well-known ``between`` command. Of course, if
217 217 # the banner contains ``1\n\n``, this will throw off our detection.
218 218
219 219 requestlog = ui.configbool('devel', 'debug.peer-request')
220 220
221 221 # Generate a random token to help identify responses to version 2
222 222 # upgrade request.
223 223 token = bytes(uuid.uuid4())
224 224 upgradecaps = [
225 225 ('proto', wireprotoserver.SSHV2),
226 226 ]
227 227 upgradecaps = util.urlreq.urlencode(upgradecaps)
228 228
229 229 try:
230 230 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
231 231 handshake = [
232 232 'hello\n',
233 233 'between\n',
234 234 'pairs %d\n' % len(pairsarg),
235 235 pairsarg,
236 236 ]
237 237
238 238 # Request upgrade to version 2 if configured.
239 239 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
240 240 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
241 241 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
242 242
243 243 if requestlog:
244 244 ui.debug('devel-peer-request: hello\n')
245 245 ui.debug('sending hello command\n')
246 246 if requestlog:
247 247 ui.debug('devel-peer-request: between\n')
248 248 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
249 249 ui.debug('sending between command\n')
250 250
251 251 stdin.write(''.join(handshake))
252 252 stdin.flush()
253 253 except IOError:
254 254 badresponse()
255 255
256 256 # Assume version 1 of wire protocol by default.
257 257 protoname = wireprotoserver.SSHV1
258 258 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
259 259
260 260 lines = ['', 'dummy']
261 261 max_noise = 500
262 262 while lines[-1] and max_noise:
263 263 try:
264 264 l = stdout.readline()
265 265 _forwardoutput(ui, stderr)
266 266
267 267 # Look for reply to protocol upgrade request. It has a token
268 268 # in it, so there should be no false positives.
269 269 m = reupgraded.match(l)
270 270 if m:
271 271 protoname = m.group(1)
272 272 ui.debug('protocol upgraded to %s\n' % protoname)
273 273 # If an upgrade was handled, the ``hello`` and ``between``
274 274 # requests are ignored. The next output belongs to the
275 275 # protocol, so stop scanning lines.
276 276 break
277 277
278 278 # Otherwise it could be a banner, ``0\n`` response if server
279 279 # doesn't support upgrade.
280 280
281 281 if lines[-1] == '1\n' and l == '\n':
282 282 break
283 283 if l:
284 284 ui.debug('remote: ', l)
285 285 lines.append(l)
286 286 max_noise -= 1
287 287 except IOError:
288 288 badresponse()
289 289 else:
290 290 badresponse()
291 291
292 292 caps = set()
293 293
294 294 # For version 1, we should see a ``capabilities`` line in response to the
295 295 # ``hello`` command.
296 296 if protoname == wireprotoserver.SSHV1:
297 297 for l in reversed(lines):
298 298 # Look for response to ``hello`` command. Scan from the back so
299 299 # we don't misinterpret banner output as the command reply.
300 300 if l.startswith('capabilities:'):
301 301 caps.update(l[:-1].split(':')[1].split())
302 302 break
303 303 elif protoname == wireprotoserver.SSHV2:
304 304 # We see a line with number of bytes to follow and then a value
305 305 # looking like ``capabilities: *``.
306 306 line = stdout.readline()
307 307 try:
308 308 valuelen = int(line)
309 309 except ValueError:
310 310 badresponse()
311 311
312 312 capsline = stdout.read(valuelen)
313 313 if not capsline.startswith('capabilities: '):
314 314 badresponse()
315 315
316 316 caps.update(capsline.split(':')[1].split())
317 317 # Trailing newline.
318 318 stdout.read(1)
319 319
320 320 # Error if we couldn't find capabilities, this means:
321 321 #
322 322 # 1. Remote isn't a Mercurial server
323 323 # 2. Remote is a <0.9.1 Mercurial server
324 324 # 3. Remote is a future Mercurial server that dropped ``hello``
325 325 # and other attempted handshake mechanisms.
326 326 if not caps:
327 327 badresponse()
328 328
329 329 return caps
330 330
331 class sshpeer(wireproto.wirepeer):
331 class sshv1peer(wireproto.wirepeer):
332 332 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
333 333 """Create a peer from an existing SSH connection.
334 334
335 335 ``proc`` is a handle on the underlying SSH process.
336 336 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
337 337 pipes for that process.
338 338 ``caps`` is a set of capabilities supported by the remote.
339 339 """
340 340 self._url = url
341 341 self._ui = ui
342 342 # self._subprocess is unused. Keeping a handle on the process
343 343 # holds a reference and prevents it from being garbage collected.
344 344 self._subprocess = proc
345 345 self._pipeo = stdin
346 346 self._pipei = stdout
347 347 self._pipee = stderr
348 348 self._caps = caps
349 349
350 350 # Begin of _basepeer interface.
351 351
352 352 @util.propertycache
353 353 def ui(self):
354 354 return self._ui
355 355
356 356 def url(self):
357 357 return self._url
358 358
359 359 def local(self):
360 360 return None
361 361
362 362 def peer(self):
363 363 return self
364 364
365 365 def canpush(self):
366 366 return True
367 367
368 368 def close(self):
369 369 pass
370 370
371 371 # End of _basepeer interface.
372 372
373 373 # Begin of _basewirecommands interface.
374 374
375 375 def capabilities(self):
376 376 return self._caps
377 377
378 378 # End of _basewirecommands interface.
379 379
380 380 def _readerr(self):
381 381 _forwardoutput(self.ui, self._pipee)
382 382
383 383 def _abort(self, exception):
384 384 self._cleanup()
385 385 raise exception
386 386
387 387 def _cleanup(self):
388 388 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
389 389
390 390 __del__ = _cleanup
391 391
392 392 def _submitbatch(self, req):
393 393 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
394 394 available = self._getamount()
395 395 # TODO this response parsing is probably suboptimal for large
396 396 # batches with large responses.
397 397 toread = min(available, 1024)
398 398 work = rsp.read(toread)
399 399 available -= toread
400 400 chunk = work
401 401 while chunk:
402 402 while ';' in work:
403 403 one, work = work.split(';', 1)
404 404 yield wireproto.unescapearg(one)
405 405 toread = min(available, 1024)
406 406 chunk = rsp.read(toread)
407 407 available -= toread
408 408 work += chunk
409 409 yield wireproto.unescapearg(work)
410 410
411 411 def _callstream(self, cmd, **args):
412 412 args = pycompat.byteskwargs(args)
413 413 if (self.ui.debugflag
414 414 and self.ui.configbool('devel', 'debug.peer-request')):
415 415 dbg = self.ui.debug
416 416 line = 'devel-peer-request: %s\n'
417 417 dbg(line % cmd)
418 418 for key, value in sorted(args.items()):
419 419 if not isinstance(value, dict):
420 420 dbg(line % ' %s: %d bytes' % (key, len(value)))
421 421 else:
422 422 for dk, dv in sorted(value.items()):
423 423 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
424 424 self.ui.debug("sending %s command\n" % cmd)
425 425 self._pipeo.write("%s\n" % cmd)
426 426 _func, names = wireproto.commands[cmd]
427 427 keys = names.split()
428 428 wireargs = {}
429 429 for k in keys:
430 430 if k == '*':
431 431 wireargs['*'] = args
432 432 break
433 433 else:
434 434 wireargs[k] = args[k]
435 435 del args[k]
436 436 for k, v in sorted(wireargs.iteritems()):
437 437 self._pipeo.write("%s %d\n" % (k, len(v)))
438 438 if isinstance(v, dict):
439 439 for dk, dv in v.iteritems():
440 440 self._pipeo.write("%s %d\n" % (dk, len(dv)))
441 441 self._pipeo.write(dv)
442 442 else:
443 443 self._pipeo.write(v)
444 444 self._pipeo.flush()
445 445
446 446 return self._pipei
447 447
448 448 def _callcompressable(self, cmd, **args):
449 449 return self._callstream(cmd, **args)
450 450
451 451 def _call(self, cmd, **args):
452 452 self._callstream(cmd, **args)
453 453 return self._recv()
454 454
455 455 def _callpush(self, cmd, fp, **args):
456 456 r = self._call(cmd, **args)
457 457 if r:
458 458 return '', r
459 459 for d in iter(lambda: fp.read(4096), ''):
460 460 self._send(d)
461 461 self._send("", flush=True)
462 462 r = self._recv()
463 463 if r:
464 464 return '', r
465 465 return self._recv(), ''
466 466
467 467 def _calltwowaystream(self, cmd, fp, **args):
468 468 r = self._call(cmd, **args)
469 469 if r:
470 470 # XXX needs to be made better
471 471 raise error.Abort(_('unexpected remote reply: %s') % r)
472 472 for d in iter(lambda: fp.read(4096), ''):
473 473 self._send(d)
474 474 self._send("", flush=True)
475 475 return self._pipei
476 476
477 477 def _getamount(self):
478 478 l = self._pipei.readline()
479 479 if l == '\n':
480 480 self._readerr()
481 481 msg = _('check previous remote output')
482 482 self._abort(error.OutOfBandError(hint=msg))
483 483 self._readerr()
484 484 try:
485 485 return int(l)
486 486 except ValueError:
487 487 self._abort(error.ResponseError(_("unexpected response:"), l))
488 488
489 489 def _recv(self):
490 490 return self._pipei.read(self._getamount())
491 491
492 492 def _send(self, data, flush=False):
493 493 self._pipeo.write("%d\n" % len(data))
494 494 if data:
495 495 self._pipeo.write(data)
496 496 if flush:
497 497 self._pipeo.flush()
498 498 self._readerr()
499 499
500 500 def instance(ui, path, create):
501 501 """Create an SSH peer.
502 502
503 503 The returned object conforms to the ``wireproto.wirepeer`` interface.
504 504 """
505 505 u = util.url(path, parsequery=False, parsefragment=False)
506 506 if u.scheme != 'ssh' or not u.host or u.path is None:
507 507 raise error.RepoError(_("couldn't parse location %s") % path)
508 508
509 509 util.checksafessh(path)
510 510
511 511 if u.passwd is not None:
512 512 raise error.RepoError(_('password in URL not supported'))
513 513
514 514 sshcmd = ui.config('ui', 'ssh')
515 515 remotecmd = ui.config('ui', 'remotecmd')
516 516 sshaddenv = dict(ui.configitems('sshenv'))
517 517 sshenv = util.shellenviron(sshaddenv)
518 518 remotepath = u.path or '.'
519 519
520 520 args = util.sshargs(sshcmd, u.host, u.user, u.port)
521 521
522 522 if create:
523 523 cmd = '%s %s %s' % (sshcmd, args,
524 524 util.shellquote('%s init %s' %
525 525 (_serverquote(remotecmd), _serverquote(remotepath))))
526 526 ui.debug('running %s\n' % cmd)
527 527 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
528 528 if res != 0:
529 529 raise error.RepoError(_('could not create remote repo'))
530 530
531 531 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
532 532 remotepath, sshenv)
533 533
534 534 try:
535 535 caps = _performhandshake(ui, stdin, stdout, stderr)
536 536 except Exception:
537 537 _cleanuppipes(ui, stdout, stdin, stderr)
538 538 raise
539 539
540 return sshpeer(ui, path, proc, stdin, stdout, stderr, caps)
540 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps)
@@ -1,74 +1,74
1 1 # Test that certain objects conform to well-defined interfaces.
2 2
3 3 from __future__ import absolute_import, print_function
4 4
5 5 from mercurial import (
6 6 bundlerepo,
7 7 httppeer,
8 8 localrepo,
9 9 sshpeer,
10 10 statichttprepo,
11 11 ui as uimod,
12 12 unionrepo,
13 13 )
14 14
15 15 def checkobject(o):
16 16 """Verify a constructed object conforms to interface rules.
17 17
18 18 An object must have __abstractmethods__ defined.
19 19
20 20 All "public" attributes of the object (attributes not prefixed with
21 21 an underscore) must be in __abstractmethods__ or appear on a base class
22 22 with __abstractmethods__.
23 23 """
24 24 name = o.__class__.__name__
25 25
26 26 allowed = set()
27 27 for cls in o.__class__.__mro__:
28 28 if not getattr(cls, '__abstractmethods__', set()):
29 29 continue
30 30
31 31 allowed |= cls.__abstractmethods__
32 32 allowed |= {a for a in dir(cls) if not a.startswith('_')}
33 33
34 34 if not allowed:
35 35 print('%s does not have abstract methods' % name)
36 36 return
37 37
38 38 public = {a for a in dir(o) if not a.startswith('_')}
39 39
40 40 for attr in sorted(public - allowed):
41 41 print('public attributes not in abstract interface: %s.%s' % (
42 42 name, attr))
43 43
44 44 # Facilitates testing localpeer.
45 45 class dummyrepo(object):
46 46 def __init__(self):
47 47 self.ui = uimod.ui()
48 48 def filtered(self, name):
49 49 pass
50 50 def _restrictcapabilities(self, caps):
51 51 pass
52 52
53 53 # Facilitates testing sshpeer without requiring an SSH server.
54 54 class badpeer(httppeer.httppeer):
55 55 def __init__(self):
56 56 super(badpeer, self).__init__(uimod.ui(), 'http://localhost')
57 57 self.badattribute = True
58 58
59 59 def badmethod(self):
60 60 pass
61 61
62 62 def main():
63 63 ui = uimod.ui()
64 64
65 65 checkobject(badpeer())
66 66 checkobject(httppeer.httppeer(ui, 'http://localhost'))
67 67 checkobject(localrepo.localpeer(dummyrepo()))
68 checkobject(sshpeer.sshpeer(ui, 'ssh://localhost/foo', None, None, None,
68 checkobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, None, None,
69 69 None, None))
70 70 checkobject(bundlerepo.bundlepeer(dummyrepo()))
71 71 checkobject(statichttprepo.statichttppeer(dummyrepo()))
72 72 checkobject(unionrepo.unionpeer(dummyrepo()))
73 73
74 74 main()
General Comments 0
You need to be logged in to leave comments. Login now