##// END OF EJS Templates
wireproto: use new peer interface...
Gregory Szorc -
r33805:dedab036 default
parent child Browse files
Show More
@@ -21,7 +21,6 b' from . import ('
21 error,
21 error,
22 httpconnection,
22 httpconnection,
23 pycompat,
23 pycompat,
24 repository,
25 statichttprepo,
24 statichttprepo,
26 url,
25 url,
27 util,
26 util,
@@ -87,7 +86,7 b' def _wraphttpresponse(resp):'
87
86
88 resp.__class__ = readerproxy
87 resp.__class__ = readerproxy
89
88
90 class httppeer(wireproto.wirepeer, repository.legacypeer):
89 class httppeer(wireproto.wirepeer):
91 def __init__(self, ui, path):
90 def __init__(self, ui, path):
92 self._path = path
91 self._path = path
93 self._caps = None
92 self._caps = None
@@ -107,9 +106,6 b' class httppeer(wireproto.wirepeer, repos'
107 self._urlopener = url.opener(ui, authinfo)
106 self._urlopener = url.opener(ui, authinfo)
108 self._requestbuilder = urlreq.request
107 self._requestbuilder = urlreq.request
109
108
110 # TODO remove once peerrepository isn't in inheritance.
111 self._capabilities = self.capabilities
112
113 def __del__(self):
109 def __del__(self):
114 urlopener = getattr(self, '_urlopener', None)
110 urlopener = getattr(self, '_urlopener', None)
115 if urlopener:
111 if urlopener:
@@ -8,7 +8,6 b''
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 from .i18n import _
12 from . import (
11 from . import (
13 error,
12 error,
14 util,
13 util,
@@ -95,46 +94,3 b' def batchable(f):'
95 return next(batchable)
94 return next(batchable)
96 setattr(plain, 'batchable', f)
95 setattr(plain, 'batchable', f)
97 return plain
96 return plain
98
99 class peerrepository(object):
100 def iterbatch(self):
101 """Batch requests but allow iterating over the results.
102
103 This is to allow interleaving responses with things like
104 progress updates for clients.
105 """
106 return localiterbatcher(self)
107
108 def capable(self, name):
109 '''tell whether repo supports named capability.
110 return False if not supported.
111 if boolean capability, return True.
112 if string capability, return string.'''
113 caps = self._capabilities()
114 if name in caps:
115 return True
116 name_eq = name + '='
117 for cap in caps:
118 if cap.startswith(name_eq):
119 return cap[len(name_eq):]
120 return False
121
122 def requirecap(self, name, purpose):
123 '''raise an exception if the given capability is not present'''
124 if not self.capable(name):
125 raise error.CapabilityError(
126 _('cannot %s; remote repository does not '
127 'support the %r capability') % (purpose, name))
128
129 def local(self):
130 '''return peer as a localrepo, or None'''
131 return None
132
133 def peer(self):
134 return self
135
136 def canpush(self):
137 return True
138
139 def close(self):
140 pass
@@ -13,7 +13,6 b' from .i18n import _'
13 from . import (
13 from . import (
14 error,
14 error,
15 pycompat,
15 pycompat,
16 repository,
17 util,
16 util,
18 wireproto,
17 wireproto,
19 )
18 )
@@ -115,7 +114,7 b' class doublepipe(object):'
115 def flush(self):
114 def flush(self):
116 return self._main.flush()
115 return self._main.flush()
117
116
118 class sshpeer(wireproto.wirepeer, repository.legacypeer):
117 class sshpeer(wireproto.wirepeer):
119 def __init__(self, ui, path, create=False):
118 def __init__(self, ui, path, create=False):
120 self._url = path
119 self._url = path
121 self._ui = ui
120 self._ui = ui
@@ -151,9 +150,6 b' class sshpeer(wireproto.wirepeer, reposi'
151
150
152 self._validaterepo(sshcmd, args, remotecmd)
151 self._validaterepo(sshcmd, args, remotecmd)
153
152
154 # TODO remove this alias once peerrepository inheritance is removed.
155 self._capabilities = self.capabilities
156
157 # Begin of _basepeer interface.
153 # Begin of _basepeer interface.
158
154
159 @util.propertycache
155 @util.propertycache
@@ -27,6 +27,7 b' from . import ('
27 peer,
27 peer,
28 pushkey as pushkeymod,
28 pushkey as pushkeymod,
29 pycompat,
29 pycompat,
30 repository,
30 streamclone,
31 streamclone,
31 util,
32 util,
32 )
33 )
@@ -212,7 +213,7 b" gboptsmap = {'heads': 'nodes',"
212
213
213 # client side
214 # client side
214
215
215 class wirepeer(peer.peerrepository):
216 class wirepeer(repository.legacypeer):
216 """Client-side interface for communicating with a peer repository.
217 """Client-side interface for communicating with a peer repository.
217
218
218 Methods commonly call wire protocol commands of the same name.
219 Methods commonly call wire protocol commands of the same name.
@@ -220,28 +221,7 b' class wirepeer(peer.peerrepository):'
220 See also httppeer.py and sshpeer.py for protocol-specific
221 See also httppeer.py and sshpeer.py for protocol-specific
221 implementations of this interface.
222 implementations of this interface.
222 """
223 """
223 def _submitbatch(self, req):
224 # Begin of basewirepeer interface.
224 """run batch request <req> on the server
225
226 Returns an iterator of the raw responses from the server.
227 """
228 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
229 chunk = rsp.read(1024)
230 work = [chunk]
231 while chunk:
232 while ';' not in chunk and chunk:
233 chunk = rsp.read(1024)
234 work.append(chunk)
235 merged = ''.join(work)
236 while ';' in merged:
237 one, merged = merged.split(';', 1)
238 yield unescapearg(one)
239 chunk = rsp.read(1024)
240 work = [merged, chunk]
241 yield unescapearg(''.join(work))
242
243 def _submitone(self, op, args):
244 return self._call(op, **args)
245
225
246 def iterbatch(self):
226 def iterbatch(self):
247 return remoteiterbatcher(self)
227 return remoteiterbatcher(self)
@@ -293,26 +273,17 b' class wirepeer(peer.peerrepository):'
293 except TypeError:
273 except TypeError:
294 self._abort(error.ResponseError(_("unexpected response:"), d))
274 self._abort(error.ResponseError(_("unexpected response:"), d))
295
275
296 def branches(self, nodes):
276 @batchable
297 n = encodelist(nodes)
277 def listkeys(self, namespace):
298 d = self._call("branches", nodes=n)
278 if not self.capable('pushkey'):
299 try:
279 yield {}, None
300 br = [tuple(decodelist(b)) for b in d.splitlines()]
280 f = future()
301 return br
281 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
302 except ValueError:
282 yield {'namespace': encoding.fromlocal(namespace)}, f
303 self._abort(error.ResponseError(_("unexpected response:"), d))
283 d = f.value
304
284 self.ui.debug('received listkey for "%s": %i bytes\n'
305 def between(self, pairs):
285 % (namespace, len(d)))
306 batch = 8 # avoid giant requests
286 yield pushkeymod.decodekeys(d)
307 r = []
308 for i in xrange(0, len(pairs), batch):
309 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
310 d = self._call("between", pairs=n)
311 try:
312 r.extend(l and decodelist(l) or [] for l in d.splitlines())
313 except ValueError:
314 self._abort(error.ResponseError(_("unexpected response:"), d))
315 return r
316
287
317 @batchable
288 @batchable
318 def pushkey(self, namespace, key, old, new):
289 def pushkey(self, namespace, key, old, new):
@@ -335,34 +306,9 b' class wirepeer(peer.peerrepository):'
335 self.ui.status(_('remote: '), l)
306 self.ui.status(_('remote: '), l)
336 yield d
307 yield d
337
308
338 @batchable
339 def listkeys(self, namespace):
340 if not self.capable('pushkey'):
341 yield {}, None
342 f = future()
343 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
344 yield {'namespace': encoding.fromlocal(namespace)}, f
345 d = f.value
346 self.ui.debug('received listkey for "%s": %i bytes\n'
347 % (namespace, len(d)))
348 yield pushkeymod.decodekeys(d)
349
350 def stream_out(self):
309 def stream_out(self):
351 return self._callstream('stream_out')
310 return self._callstream('stream_out')
352
311
353 def changegroup(self, nodes, kind):
354 n = encodelist(nodes)
355 f = self._callcompressable("changegroup", roots=n)
356 return changegroupmod.cg1unpacker(f, 'UN')
357
358 def changegroupsubset(self, bases, heads, kind):
359 self.requirecap('changegroupsubset', _('look up remote changes'))
360 bases = encodelist(bases)
361 heads = encodelist(heads)
362 f = self._callcompressable("changegroupsubset",
363 bases=bases, heads=heads)
364 return changegroupmod.cg1unpacker(f, 'UN')
365
366 def getbundle(self, source, **kwargs):
312 def getbundle(self, source, **kwargs):
367 self.requirecap('getbundle', _('look up remote changes'))
313 self.requirecap('getbundle', _('look up remote changes'))
368 opts = {}
314 opts = {}
@@ -433,6 +379,69 b' class wirepeer(peer.peerrepository):'
433 ret = bundle2.getunbundler(self.ui, stream)
379 ret = bundle2.getunbundler(self.ui, stream)
434 return ret
380 return ret
435
381
382 # End of basewirepeer interface.
383
384 # Begin of baselegacywirepeer interface.
385
386 def branches(self, nodes):
387 n = encodelist(nodes)
388 d = self._call("branches", nodes=n)
389 try:
390 br = [tuple(decodelist(b)) for b in d.splitlines()]
391 return br
392 except ValueError:
393 self._abort(error.ResponseError(_("unexpected response:"), d))
394
395 def between(self, pairs):
396 batch = 8 # avoid giant requests
397 r = []
398 for i in xrange(0, len(pairs), batch):
399 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
400 d = self._call("between", pairs=n)
401 try:
402 r.extend(l and decodelist(l) or [] for l in d.splitlines())
403 except ValueError:
404 self._abort(error.ResponseError(_("unexpected response:"), d))
405 return r
406
407 def changegroup(self, nodes, kind):
408 n = encodelist(nodes)
409 f = self._callcompressable("changegroup", roots=n)
410 return changegroupmod.cg1unpacker(f, 'UN')
411
412 def changegroupsubset(self, bases, heads, kind):
413 self.requirecap('changegroupsubset', _('look up remote changes'))
414 bases = encodelist(bases)
415 heads = encodelist(heads)
416 f = self._callcompressable("changegroupsubset",
417 bases=bases, heads=heads)
418 return changegroupmod.cg1unpacker(f, 'UN')
419
420 # End of baselegacywirepeer interface.
421
422 def _submitbatch(self, req):
423 """run batch request <req> on the server
424
425 Returns an iterator of the raw responses from the server.
426 """
427 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
428 chunk = rsp.read(1024)
429 work = [chunk]
430 while chunk:
431 while ';' not in chunk and chunk:
432 chunk = rsp.read(1024)
433 work.append(chunk)
434 merged = ''.join(work)
435 while ';' in merged:
436 one, merged = merged.split(';', 1)
437 yield unescapearg(one)
438 chunk = rsp.read(1024)
439 work = [merged, chunk]
440 yield unescapearg(''.join(work))
441
442 def _submitone(self, op, args):
443 return self._call(op, **args)
444
436 def debugwireargs(self, one, two, three=None, four=None, five=None):
445 def debugwireargs(self, one, two, three=None, four=None, five=None):
437 # don't pass optional arguments left at their default value
446 # don't pass optional arguments left at their default value
438 opts = {}
447 opts = {}
@@ -6,9 +6,9 b' then'
6 fi
6 fi
7
7
8 cat > notcapable-$CAP.py << EOF
8 cat > notcapable-$CAP.py << EOF
9 from mercurial import extensions, peer, localrepo
9 from mercurial import extensions, localrepo, repository
10 def extsetup():
10 def extsetup():
11 extensions.wrapfunction(peer.peerrepository, 'capable', wrapcapable)
11 extensions.wrapfunction(repository.peer, 'capable', wrapcapable)
12 extensions.wrapfunction(localrepo.localrepository, 'peer', wrappeer)
12 extensions.wrapfunction(localrepo.localrepository, 'peer', wrappeer)
13 def wrapcapable(orig, self, name, *args, **kwargs):
13 def wrapcapable(orig, self, name, *args, **kwargs):
14 if name in '$CAP'.split(' '):
14 if name in '$CAP'.split(' '):
@@ -19,7 +19,26 b' class clientpeer(wireproto.wirepeer):'
19 def __init__(self, serverrepo):
19 def __init__(self, serverrepo):
20 self.serverrepo = serverrepo
20 self.serverrepo = serverrepo
21
21
22 def _capabilities(self):
22 @property
23 def ui(self):
24 return self.serverrepo.ui
25
26 def url(self):
27 return 'test'
28
29 def local(self):
30 return None
31
32 def peer(self):
33 return self
34
35 def canpush(self):
36 return True
37
38 def close(self):
39 pass
40
41 def capabilities(self):
23 return ['batch']
42 return ['batch']
24
43
25 def _call(self, cmd, **args):
44 def _call(self, cmd, **args):
General Comments 0
You need to be logged in to leave comments. Login now