##// END OF EJS Templates
wireproto: use new peer interface...
Gregory Szorc -
r33805:dedab036 default
parent child Browse files
Show More
@@ -21,7 +21,6 b' from . import ('
21 21 error,
22 22 httpconnection,
23 23 pycompat,
24 repository,
25 24 statichttprepo,
26 25 url,
27 26 util,
@@ -87,7 +86,7 b' def _wraphttpresponse(resp):'
87 86
88 87 resp.__class__ = readerproxy
89 88
90 class httppeer(wireproto.wirepeer, repository.legacypeer):
89 class httppeer(wireproto.wirepeer):
91 90 def __init__(self, ui, path):
92 91 self._path = path
93 92 self._caps = None
@@ -107,9 +106,6 b' class httppeer(wireproto.wirepeer, repos'
107 106 self._urlopener = url.opener(ui, authinfo)
108 107 self._requestbuilder = urlreq.request
109 108
110 # TODO remove once peerrepository isn't in inheritance.
111 self._capabilities = self.capabilities
112
113 109 def __del__(self):
114 110 urlopener = getattr(self, '_urlopener', None)
115 111 if urlopener:
@@ -8,7 +8,6 b''
8 8
9 9 from __future__ import absolute_import
10 10
11 from .i18n import _
12 11 from . import (
13 12 error,
14 13 util,
@@ -95,46 +94,3 b' def batchable(f):'
95 94 return next(batchable)
96 95 setattr(plain, 'batchable', f)
97 96 return plain
98
99 class peerrepository(object):
100 def iterbatch(self):
101 """Batch requests but allow iterating over the results.
102
103 This is to allow interleaving responses with things like
104 progress updates for clients.
105 """
106 return localiterbatcher(self)
107
108 def capable(self, name):
109 '''tell whether repo supports named capability.
110 return False if not supported.
111 if boolean capability, return True.
112 if string capability, return string.'''
113 caps = self._capabilities()
114 if name in caps:
115 return True
116 name_eq = name + '='
117 for cap in caps:
118 if cap.startswith(name_eq):
119 return cap[len(name_eq):]
120 return False
121
122 def requirecap(self, name, purpose):
123 '''raise an exception if the given capability is not present'''
124 if not self.capable(name):
125 raise error.CapabilityError(
126 _('cannot %s; remote repository does not '
127 'support the %r capability') % (purpose, name))
128
129 def local(self):
130 '''return peer as a localrepo, or None'''
131 return None
132
133 def peer(self):
134 return self
135
136 def canpush(self):
137 return True
138
139 def close(self):
140 pass
@@ -13,7 +13,6 b' from .i18n import _'
13 13 from . import (
14 14 error,
15 15 pycompat,
16 repository,
17 16 util,
18 17 wireproto,
19 18 )
@@ -115,7 +114,7 b' class doublepipe(object):'
115 114 def flush(self):
116 115 return self._main.flush()
117 116
118 class sshpeer(wireproto.wirepeer, repository.legacypeer):
117 class sshpeer(wireproto.wirepeer):
119 118 def __init__(self, ui, path, create=False):
120 119 self._url = path
121 120 self._ui = ui
@@ -151,9 +150,6 b' class sshpeer(wireproto.wirepeer, reposi'
151 150
152 151 self._validaterepo(sshcmd, args, remotecmd)
153 152
154 # TODO remove this alias once peerrepository inheritance is removed.
155 self._capabilities = self.capabilities
156
157 153 # Begin of _basepeer interface.
158 154
159 155 @util.propertycache
@@ -27,6 +27,7 b' from . import ('
27 27 peer,
28 28 pushkey as pushkeymod,
29 29 pycompat,
30 repository,
30 31 streamclone,
31 32 util,
32 33 )
@@ -212,7 +213,7 b" gboptsmap = {'heads': 'nodes',"
212 213
213 214 # client side
214 215
215 class wirepeer(peer.peerrepository):
216 class wirepeer(repository.legacypeer):
216 217 """Client-side interface for communicating with a peer repository.
217 218
218 219 Methods commonly call wire protocol commands of the same name.
@@ -220,28 +221,7 b' class wirepeer(peer.peerrepository):'
220 221 See also httppeer.py and sshpeer.py for protocol-specific
221 222 implementations of this interface.
222 223 """
223 def _submitbatch(self, req):
224 """run batch request <req> on the server
225
226 Returns an iterator of the raw responses from the server.
227 """
228 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
229 chunk = rsp.read(1024)
230 work = [chunk]
231 while chunk:
232 while ';' not in chunk and chunk:
233 chunk = rsp.read(1024)
234 work.append(chunk)
235 merged = ''.join(work)
236 while ';' in merged:
237 one, merged = merged.split(';', 1)
238 yield unescapearg(one)
239 chunk = rsp.read(1024)
240 work = [merged, chunk]
241 yield unescapearg(''.join(work))
242
243 def _submitone(self, op, args):
244 return self._call(op, **args)
224 # Begin of basewirepeer interface.
245 225
246 226 def iterbatch(self):
247 227 return remoteiterbatcher(self)
@@ -293,26 +273,17 b' class wirepeer(peer.peerrepository):'
293 273 except TypeError:
294 274 self._abort(error.ResponseError(_("unexpected response:"), d))
295 275
296 def branches(self, nodes):
297 n = encodelist(nodes)
298 d = self._call("branches", nodes=n)
299 try:
300 br = [tuple(decodelist(b)) for b in d.splitlines()]
301 return br
302 except ValueError:
303 self._abort(error.ResponseError(_("unexpected response:"), d))
304
305 def between(self, pairs):
306 batch = 8 # avoid giant requests
307 r = []
308 for i in xrange(0, len(pairs), batch):
309 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
310 d = self._call("between", pairs=n)
311 try:
312 r.extend(l and decodelist(l) or [] for l in d.splitlines())
313 except ValueError:
314 self._abort(error.ResponseError(_("unexpected response:"), d))
315 return r
276 @batchable
277 def listkeys(self, namespace):
278 if not self.capable('pushkey'):
279 yield {}, None
280 f = future()
281 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
282 yield {'namespace': encoding.fromlocal(namespace)}, f
283 d = f.value
284 self.ui.debug('received listkey for "%s": %i bytes\n'
285 % (namespace, len(d)))
286 yield pushkeymod.decodekeys(d)
316 287
317 288 @batchable
318 289 def pushkey(self, namespace, key, old, new):
@@ -335,34 +306,9 b' class wirepeer(peer.peerrepository):'
335 306 self.ui.status(_('remote: '), l)
336 307 yield d
337 308
338 @batchable
339 def listkeys(self, namespace):
340 if not self.capable('pushkey'):
341 yield {}, None
342 f = future()
343 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
344 yield {'namespace': encoding.fromlocal(namespace)}, f
345 d = f.value
346 self.ui.debug('received listkey for "%s": %i bytes\n'
347 % (namespace, len(d)))
348 yield pushkeymod.decodekeys(d)
349
350 309 def stream_out(self):
351 310 return self._callstream('stream_out')
352 311
353 def changegroup(self, nodes, kind):
354 n = encodelist(nodes)
355 f = self._callcompressable("changegroup", roots=n)
356 return changegroupmod.cg1unpacker(f, 'UN')
357
358 def changegroupsubset(self, bases, heads, kind):
359 self.requirecap('changegroupsubset', _('look up remote changes'))
360 bases = encodelist(bases)
361 heads = encodelist(heads)
362 f = self._callcompressable("changegroupsubset",
363 bases=bases, heads=heads)
364 return changegroupmod.cg1unpacker(f, 'UN')
365
366 312 def getbundle(self, source, **kwargs):
367 313 self.requirecap('getbundle', _('look up remote changes'))
368 314 opts = {}
@@ -433,6 +379,69 b' class wirepeer(peer.peerrepository):'
433 379 ret = bundle2.getunbundler(self.ui, stream)
434 380 return ret
435 381
382 # End of basewirepeer interface.
383
384 # Begin of baselegacywirepeer interface.
385
386 def branches(self, nodes):
387 n = encodelist(nodes)
388 d = self._call("branches", nodes=n)
389 try:
390 br = [tuple(decodelist(b)) for b in d.splitlines()]
391 return br
392 except ValueError:
393 self._abort(error.ResponseError(_("unexpected response:"), d))
394
395 def between(self, pairs):
396 batch = 8 # avoid giant requests
397 r = []
398 for i in xrange(0, len(pairs), batch):
399 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
400 d = self._call("between", pairs=n)
401 try:
402 r.extend(l and decodelist(l) or [] for l in d.splitlines())
403 except ValueError:
404 self._abort(error.ResponseError(_("unexpected response:"), d))
405 return r
406
407 def changegroup(self, nodes, kind):
408 n = encodelist(nodes)
409 f = self._callcompressable("changegroup", roots=n)
410 return changegroupmod.cg1unpacker(f, 'UN')
411
412 def changegroupsubset(self, bases, heads, kind):
413 self.requirecap('changegroupsubset', _('look up remote changes'))
414 bases = encodelist(bases)
415 heads = encodelist(heads)
416 f = self._callcompressable("changegroupsubset",
417 bases=bases, heads=heads)
418 return changegroupmod.cg1unpacker(f, 'UN')
419
420 # End of baselegacywirepeer interface.
421
422 def _submitbatch(self, req):
423 """run batch request <req> on the server
424
425 Returns an iterator of the raw responses from the server.
426 """
427 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
428 chunk = rsp.read(1024)
429 work = [chunk]
430 while chunk:
431 while ';' not in chunk and chunk:
432 chunk = rsp.read(1024)
433 work.append(chunk)
434 merged = ''.join(work)
435 while ';' in merged:
436 one, merged = merged.split(';', 1)
437 yield unescapearg(one)
438 chunk = rsp.read(1024)
439 work = [merged, chunk]
440 yield unescapearg(''.join(work))
441
442 def _submitone(self, op, args):
443 return self._call(op, **args)
444
436 445 def debugwireargs(self, one, two, three=None, four=None, five=None):
437 446 # don't pass optional arguments left at their default value
438 447 opts = {}
@@ -6,9 +6,9 b' then'
6 6 fi
7 7
8 8 cat > notcapable-$CAP.py << EOF
9 from mercurial import extensions, peer, localrepo
9 from mercurial import extensions, localrepo, repository
10 10 def extsetup():
11 extensions.wrapfunction(peer.peerrepository, 'capable', wrapcapable)
11 extensions.wrapfunction(repository.peer, 'capable', wrapcapable)
12 12 extensions.wrapfunction(localrepo.localrepository, 'peer', wrappeer)
13 13 def wrapcapable(orig, self, name, *args, **kwargs):
14 14 if name in '$CAP'.split(' '):
@@ -19,7 +19,26 b' class clientpeer(wireproto.wirepeer):'
19 19 def __init__(self, serverrepo):
20 20 self.serverrepo = serverrepo
21 21
22 def _capabilities(self):
22 @property
23 def ui(self):
24 return self.serverrepo.ui
25
26 def url(self):
27 return 'test'
28
29 def local(self):
30 return None
31
32 def peer(self):
33 return self
34
35 def canpush(self):
36 return True
37
38 def close(self):
39 pass
40
41 def capabilities(self):
23 42 return ['batch']
24 43
25 44 def _call(self, cmd, **args):
General Comments 0
You need to be logged in to leave comments. Login now