##// END OF EJS Templates
wireproto: in batch queries, support queries with immediate responses...
Valentin Gatien-Baron -
r41085:55e8da48 default
parent child Browse files
Show More
@@ -0,0 +1,46 b''
1 Checking how hg behaves when one side of a pull/push doesn't support
2 some capability (because it's running an older hg version, usually).
3
4 $ hg init repo1
5 $ cd repo1
6 $ echo a > a; hg add -q a; hg commit -q -m a
7 $ hg bookmark a
8 $ hg clone -q . ../repo2
9 $ cd ../repo2
10
11 $ touch $TESTTMP/disable-lookup.py
12 $ disable_cap() {
13 > rm -f $TESTTMP/disable-lookup.pyc # pyc caching is buggy
14 > cat <<EOF > $TESTTMP/disable-lookup.py
15 > from mercurial import extensions, wireprotov1server
16 > def wcapabilities(orig, *args, **kwargs):
17 > cap = orig(*args, **kwargs)
18 > cap.remove('$1')
19 > return cap
20 > extensions.wrapfunction(wireprotov1server, '_capabilities', wcapabilities)
21 > EOF
22 > }
23 $ cat >> ../repo1/.hg/hgrc <<EOF
24 > [extensions]
25 > disable-lookup = $TESTTMP/disable-lookup.py
26 > EOF
27 $ cat >> .hg/hgrc <<EOF
28 > [ui]
29 > ssh = "$PYTHON" "$TESTDIR/dummyssh"
30 > EOF
31
32 $ hg pull ssh://user@dummy/repo1 -r tip -B a
33 pulling from ssh://user@dummy/repo1
34 no changes found
35
36 $ disable_cap lookup
37 $ hg pull ssh://user@dummy/repo1 -r tip -B a
38 pulling from ssh://user@dummy/repo1
39 abort: other repository doesn't support revision lookup, so a rev cannot be specified.
40 [255]
41
42 $ disable_cap pushkey
43 $ hg pull ssh://user@dummy/repo1 -r tip -B a
44 pulling from ssh://user@dummy/repo1
45 abort: remote bookmark a not found!
46 [255]
@@ -1,621 +1,624 b''
1 1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import sys
12 12 import weakref
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 )
18 18 from . import (
19 19 bundle2,
20 20 changegroup as changegroupmod,
21 21 encoding,
22 22 error,
23 23 pushkey as pushkeymod,
24 24 pycompat,
25 25 repository,
26 26 util,
27 27 wireprototypes,
28 28 )
29 29 from .utils import (
30 30 interfaceutil,
31 31 )
32 32
33 33 urlreq = util.urlreq
34 34
35 35 def batchable(f):
36 36 '''annotation for batchable methods
37 37
38 38 Such methods must implement a coroutine as follows:
39 39
40 40 @batchable
41 41 def sample(self, one, two=None):
42 42 # Build list of encoded arguments suitable for your wire protocol:
43 43 encargs = [('one', encode(one),), ('two', encode(two),)]
44 44 # Create future for injection of encoded result:
45 45 encresref = future()
46 46 # Return encoded arguments and future:
47 47 yield encargs, encresref
48 48 # Assuming the future to be filled with the result from the batched
49 49 # request now. Decode it:
50 50 yield decode(encresref.value)
51 51
52 52 The decorator returns a function which wraps this coroutine as a plain
53 53 method, but adds the original method as an attribute called "batchable",
54 54 which is used by remotebatch to split the call into separate encoding and
55 55 decoding phases.
56 56 '''
57 57 def plain(*args, **opts):
58 58 batchable = f(*args, **opts)
59 59 encargsorres, encresref = next(batchable)
60 60 if not encresref:
61 61 return encargsorres # a local result in this case
62 62 self = args[0]
63 63 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
64 64 encresref.set(self._submitone(cmd, encargsorres))
65 65 return next(batchable)
66 66 setattr(plain, 'batchable', f)
67 67 setattr(plain, '__name__', f.__name__)
68 68 return plain
69 69
70 70 class future(object):
71 71 '''placeholder for a value to be set later'''
72 72 def set(self, value):
73 73 if util.safehasattr(self, 'value'):
74 74 raise error.RepoError("future is already set")
75 75 self.value = value
76 76
77 77 def encodebatchcmds(req):
78 78 """Return a ``cmds`` argument value for the ``batch`` command."""
79 79 escapearg = wireprototypes.escapebatcharg
80 80
81 81 cmds = []
82 82 for op, argsdict in req:
83 83 # Old servers didn't properly unescape argument names. So prevent
84 84 # the sending of argument names that may not be decoded properly by
85 85 # servers.
86 86 assert all(escapearg(k) == k for k in argsdict)
87 87
88 88 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
89 89 for k, v in argsdict.iteritems())
90 90 cmds.append('%s %s' % (op, args))
91 91
92 92 return ';'.join(cmds)
93 93
94 94 class unsentfuture(pycompat.futures.Future):
95 95 """A Future variation to represent an unsent command.
96 96
97 97 Because we buffer commands and don't submit them immediately, calling
98 98 ``result()`` on an unsent future could deadlock. Futures for buffered
99 99 commands are represented by this type, which wraps ``result()`` to
100 100 call ``sendcommands()``.
101 101 """
102 102
103 103 def result(self, timeout=None):
104 104 if self.done():
105 105 return pycompat.futures.Future.result(self, timeout)
106 106
107 107 self._peerexecutor.sendcommands()
108 108
109 109 # This looks like it will infinitely recurse. However,
110 110 # sendcommands() should modify __class__. This call serves as a check
111 111 # on that.
112 112 return self.result(timeout)
113 113
114 114 @interfaceutil.implementer(repository.ipeercommandexecutor)
115 115 class peerexecutor(object):
116 116 def __init__(self, peer):
117 117 self._peer = peer
118 118 self._sent = False
119 119 self._closed = False
120 120 self._calls = []
121 121 self._futures = weakref.WeakSet()
122 122 self._responseexecutor = None
123 123 self._responsef = None
124 124
125 125 def __enter__(self):
126 126 return self
127 127
128 128 def __exit__(self, exctype, excvalee, exctb):
129 129 self.close()
130 130
131 131 def callcommand(self, command, args):
132 132 if self._sent:
133 133 raise error.ProgrammingError('callcommand() cannot be used '
134 134 'after commands are sent')
135 135
136 136 if self._closed:
137 137 raise error.ProgrammingError('callcommand() cannot be used '
138 138 'after close()')
139 139
140 140 # Commands are dispatched through methods on the peer.
141 141 fn = getattr(self._peer, pycompat.sysstr(command), None)
142 142
143 143 if not fn:
144 144 raise error.ProgrammingError(
145 145 'cannot call command %s: method of same name not available '
146 146 'on peer' % command)
147 147
148 148 # Commands are either batchable or they aren't. If a command
149 149 # isn't batchable, we send it immediately because the executor
150 150 # can no longer accept new commands after a non-batchable command.
151 151 # If a command is batchable, we queue it for later. But we have
152 152 # to account for the case of a non-batchable command arriving after
153 153 # a batchable one and refuse to service it.
154 154
155 155 def addcall():
156 156 f = pycompat.futures.Future()
157 157 self._futures.add(f)
158 158 self._calls.append((command, args, fn, f))
159 159 return f
160 160
161 161 if getattr(fn, 'batchable', False):
162 162 f = addcall()
163 163
164 164 # But since we don't issue it immediately, we wrap its result()
165 165 # to trigger sending so we avoid deadlocks.
166 166 f.__class__ = unsentfuture
167 167 f._peerexecutor = self
168 168 else:
169 169 if self._calls:
170 170 raise error.ProgrammingError(
171 171 '%s is not batchable and cannot be called on a command '
172 172 'executor along with other commands' % command)
173 173
174 174 f = addcall()
175 175
176 176 # Non-batchable commands can never coexist with another command
177 177 # in this executor. So send the command immediately.
178 178 self.sendcommands()
179 179
180 180 return f
181 181
182 182 def sendcommands(self):
183 183 if self._sent:
184 184 return
185 185
186 186 if not self._calls:
187 187 return
188 188
189 189 self._sent = True
190 190
191 191 # Unhack any future types so caller seens a clean type and to break
192 192 # cycle between us and futures.
193 193 for f in self._futures:
194 194 if isinstance(f, unsentfuture):
195 195 f.__class__ = pycompat.futures.Future
196 196 f._peerexecutor = None
197 197
198 198 calls = self._calls
199 199 # Mainly to destroy references to futures.
200 200 self._calls = None
201 201
202 202 # Simple case of a single command. We call it synchronously.
203 203 if len(calls) == 1:
204 204 command, args, fn, f = calls[0]
205 205
206 206 # Future was cancelled. Ignore it.
207 207 if not f.set_running_or_notify_cancel():
208 208 return
209 209
210 210 try:
211 211 result = fn(**pycompat.strkwargs(args))
212 212 except Exception:
213 213 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
214 214 else:
215 215 f.set_result(result)
216 216
217 217 return
218 218
219 219 # Batch commands are a bit harder. First, we have to deal with the
220 220 # @batchable coroutine. That's a bit annoying. Furthermore, we also
221 221 # need to preserve streaming. i.e. it should be possible for the
222 222 # futures to resolve as data is coming in off the wire without having
223 223 # to wait for the final byte of the final response. We do this by
224 224 # spinning up a thread to read the responses.
225 225
226 226 requests = []
227 227 states = []
228 228
229 229 for command, args, fn, f in calls:
230 230 # Future was cancelled. Ignore it.
231 231 if not f.set_running_or_notify_cancel():
232 232 continue
233 233
234 234 try:
235 235 batchable = fn.batchable(fn.__self__,
236 236 **pycompat.strkwargs(args))
237 237 except Exception:
238 238 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
239 239 return
240 240
241 241 # Encoded arguments and future holding remote result.
242 242 try:
243 encodedargs, fremote = next(batchable)
243 encargsorres, fremote = next(batchable)
244 244 except Exception:
245 245 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
246 246 return
247 247
248 requests.append((command, encodedargs))
249 states.append((command, f, batchable, fremote))
248 if not fremote:
249 f.set_result(encargsorres)
250 else:
251 requests.append((command, encargsorres))
252 states.append((command, f, batchable, fremote))
250 253
251 254 if not requests:
252 255 return
253 256
254 257 # This will emit responses in order they were executed.
255 258 wireresults = self._peer._submitbatch(requests)
256 259
257 260 # The use of a thread pool executor here is a bit weird for something
258 261 # that only spins up a single thread. However, thread management is
259 262 # hard and it is easy to encounter race conditions, deadlocks, etc.
260 263 # concurrent.futures already solves these problems and its thread pool
261 264 # executor has minimal overhead. So we use it.
262 265 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
263 266 self._responsef = self._responseexecutor.submit(self._readbatchresponse,
264 267 states, wireresults)
265 268
266 269 def close(self):
267 270 self.sendcommands()
268 271
269 272 if self._closed:
270 273 return
271 274
272 275 self._closed = True
273 276
274 277 if not self._responsef:
275 278 return
276 279
277 280 # We need to wait on our in-flight response and then shut down the
278 281 # executor once we have a result.
279 282 try:
280 283 self._responsef.result()
281 284 finally:
282 285 self._responseexecutor.shutdown(wait=True)
283 286 self._responsef = None
284 287 self._responseexecutor = None
285 288
286 289 # If any of our futures are still in progress, mark them as
287 290 # errored. Otherwise a result() could wait indefinitely.
288 291 for f in self._futures:
289 292 if not f.done():
290 293 f.set_exception(error.ResponseError(
291 294 _('unfulfilled batch command response')))
292 295
293 296 self._futures = None
294 297
295 298 def _readbatchresponse(self, states, wireresults):
296 299 # Executes in a thread to read data off the wire.
297 300
298 301 for command, f, batchable, fremote in states:
299 302 # Grab raw result off the wire and teach the internal future
300 303 # about it.
301 304 remoteresult = next(wireresults)
302 305 fremote.set(remoteresult)
303 306
304 307 # And ask the coroutine to decode that value.
305 308 try:
306 309 result = next(batchable)
307 310 except Exception:
308 311 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
309 312 else:
310 313 f.set_result(result)
311 314
312 315 @interfaceutil.implementer(repository.ipeercommands,
313 316 repository.ipeerlegacycommands)
314 317 class wirepeer(repository.peer):
315 318 """Client-side interface for communicating with a peer repository.
316 319
317 320 Methods commonly call wire protocol commands of the same name.
318 321
319 322 See also httppeer.py and sshpeer.py for protocol-specific
320 323 implementations of this interface.
321 324 """
322 325 def commandexecutor(self):
323 326 return peerexecutor(self)
324 327
325 328 # Begin of ipeercommands interface.
326 329
327 330 def clonebundles(self):
328 331 self.requirecap('clonebundles', _('clone bundles'))
329 332 return self._call('clonebundles')
330 333
331 334 @batchable
332 335 def lookup(self, key):
333 336 self.requirecap('lookup', _('look up remote revision'))
334 337 f = future()
335 338 yield {'key': encoding.fromlocal(key)}, f
336 339 d = f.value
337 340 success, data = d[:-1].split(" ", 1)
338 341 if int(success):
339 342 yield bin(data)
340 343 else:
341 344 self._abort(error.RepoError(data))
342 345
343 346 @batchable
344 347 def heads(self):
345 348 f = future()
346 349 yield {}, f
347 350 d = f.value
348 351 try:
349 352 yield wireprototypes.decodelist(d[:-1])
350 353 except ValueError:
351 354 self._abort(error.ResponseError(_("unexpected response:"), d))
352 355
353 356 @batchable
354 357 def known(self, nodes):
355 358 f = future()
356 359 yield {'nodes': wireprototypes.encodelist(nodes)}, f
357 360 d = f.value
358 361 try:
359 362 yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
360 363 except ValueError:
361 364 self._abort(error.ResponseError(_("unexpected response:"), d))
362 365
363 366 @batchable
364 367 def branchmap(self):
365 368 f = future()
366 369 yield {}, f
367 370 d = f.value
368 371 try:
369 372 branchmap = {}
370 373 for branchpart in d.splitlines():
371 374 branchname, branchheads = branchpart.split(' ', 1)
372 375 branchname = encoding.tolocal(urlreq.unquote(branchname))
373 376 branchheads = wireprototypes.decodelist(branchheads)
374 377 branchmap[branchname] = branchheads
375 378 yield branchmap
376 379 except TypeError:
377 380 self._abort(error.ResponseError(_("unexpected response:"), d))
378 381
379 382 @batchable
380 383 def listkeys(self, namespace):
381 384 if not self.capable('pushkey'):
382 385 yield {}, None
383 386 f = future()
384 387 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
385 388 yield {'namespace': encoding.fromlocal(namespace)}, f
386 389 d = f.value
387 390 self.ui.debug('received listkey for "%s": %i bytes\n'
388 391 % (namespace, len(d)))
389 392 yield pushkeymod.decodekeys(d)
390 393
391 394 @batchable
392 395 def pushkey(self, namespace, key, old, new):
393 396 if not self.capable('pushkey'):
394 397 yield False, None
395 398 f = future()
396 399 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
397 400 yield {'namespace': encoding.fromlocal(namespace),
398 401 'key': encoding.fromlocal(key),
399 402 'old': encoding.fromlocal(old),
400 403 'new': encoding.fromlocal(new)}, f
401 404 d = f.value
402 405 d, output = d.split('\n', 1)
403 406 try:
404 407 d = bool(int(d))
405 408 except ValueError:
406 409 raise error.ResponseError(
407 410 _('push failed (unexpected response):'), d)
408 411 for l in output.splitlines(True):
409 412 self.ui.status(_('remote: '), l)
410 413 yield d
411 414
412 415 def stream_out(self):
413 416 return self._callstream('stream_out')
414 417
415 418 def getbundle(self, source, **kwargs):
416 419 kwargs = pycompat.byteskwargs(kwargs)
417 420 self.requirecap('getbundle', _('look up remote changes'))
418 421 opts = {}
419 422 bundlecaps = kwargs.get('bundlecaps') or set()
420 423 for key, value in kwargs.iteritems():
421 424 if value is None:
422 425 continue
423 426 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
424 427 if keytype is None:
425 428 raise error.ProgrammingError(
426 429 'Unexpectedly None keytype for key %s' % key)
427 430 elif keytype == 'nodes':
428 431 value = wireprototypes.encodelist(value)
429 432 elif keytype == 'csv':
430 433 value = ','.join(value)
431 434 elif keytype == 'scsv':
432 435 value = ','.join(sorted(value))
433 436 elif keytype == 'boolean':
434 437 value = '%i' % bool(value)
435 438 elif keytype != 'plain':
436 439 raise KeyError('unknown getbundle option type %s'
437 440 % keytype)
438 441 opts[key] = value
439 442 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
440 443 if any((cap.startswith('HG2') for cap in bundlecaps)):
441 444 return bundle2.getunbundler(self.ui, f)
442 445 else:
443 446 return changegroupmod.cg1unpacker(f, 'UN')
444 447
445 448 def unbundle(self, bundle, heads, url):
446 449 '''Send cg (a readable file-like object representing the
447 450 changegroup to push, typically a chunkbuffer object) to the
448 451 remote server as a bundle.
449 452
450 453 When pushing a bundle10 stream, return an integer indicating the
451 454 result of the push (see changegroup.apply()).
452 455
453 456 When pushing a bundle20 stream, return a bundle20 stream.
454 457
455 458 `url` is the url the client thinks it's pushing to, which is
456 459 visible to hooks.
457 460 '''
458 461
459 462 if heads != ['force'] and self.capable('unbundlehash'):
460 463 heads = wireprototypes.encodelist(
461 464 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
462 465 else:
463 466 heads = wireprototypes.encodelist(heads)
464 467
465 468 if util.safehasattr(bundle, 'deltaheader'):
466 469 # this a bundle10, do the old style call sequence
467 470 ret, output = self._callpush("unbundle", bundle, heads=heads)
468 471 if ret == "":
469 472 raise error.ResponseError(
470 473 _('push failed:'), output)
471 474 try:
472 475 ret = int(ret)
473 476 except ValueError:
474 477 raise error.ResponseError(
475 478 _('push failed (unexpected response):'), ret)
476 479
477 480 for l in output.splitlines(True):
478 481 self.ui.status(_('remote: '), l)
479 482 else:
480 483 # bundle2 push. Send a stream, fetch a stream.
481 484 stream = self._calltwowaystream('unbundle', bundle, heads=heads)
482 485 ret = bundle2.getunbundler(self.ui, stream)
483 486 return ret
484 487
485 488 # End of ipeercommands interface.
486 489
487 490 # Begin of ipeerlegacycommands interface.
488 491
489 492 def branches(self, nodes):
490 493 n = wireprototypes.encodelist(nodes)
491 494 d = self._call("branches", nodes=n)
492 495 try:
493 496 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
494 497 return br
495 498 except ValueError:
496 499 self._abort(error.ResponseError(_("unexpected response:"), d))
497 500
498 501 def between(self, pairs):
499 502 batch = 8 # avoid giant requests
500 503 r = []
501 504 for i in pycompat.xrange(0, len(pairs), batch):
502 505 n = " ".join([wireprototypes.encodelist(p, '-')
503 506 for p in pairs[i:i + batch]])
504 507 d = self._call("between", pairs=n)
505 508 try:
506 509 r.extend(l and wireprototypes.decodelist(l) or []
507 510 for l in d.splitlines())
508 511 except ValueError:
509 512 self._abort(error.ResponseError(_("unexpected response:"), d))
510 513 return r
511 514
512 515 def changegroup(self, nodes, source):
513 516 n = wireprototypes.encodelist(nodes)
514 517 f = self._callcompressable("changegroup", roots=n)
515 518 return changegroupmod.cg1unpacker(f, 'UN')
516 519
517 520 def changegroupsubset(self, bases, heads, source):
518 521 self.requirecap('changegroupsubset', _('look up remote changes'))
519 522 bases = wireprototypes.encodelist(bases)
520 523 heads = wireprototypes.encodelist(heads)
521 524 f = self._callcompressable("changegroupsubset",
522 525 bases=bases, heads=heads)
523 526 return changegroupmod.cg1unpacker(f, 'UN')
524 527
525 528 # End of ipeerlegacycommands interface.
526 529
527 530 def _submitbatch(self, req):
528 531 """run batch request <req> on the server
529 532
530 533 Returns an iterator of the raw responses from the server.
531 534 """
532 535 ui = self.ui
533 536 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
534 537 ui.debug('devel-peer-request: batched-content\n')
535 538 for op, args in req:
536 539 msg = 'devel-peer-request: - %s (%d arguments)\n'
537 540 ui.debug(msg % (op, len(args)))
538 541
539 542 unescapearg = wireprototypes.unescapebatcharg
540 543
541 544 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
542 545 chunk = rsp.read(1024)
543 546 work = [chunk]
544 547 while chunk:
545 548 while ';' not in chunk and chunk:
546 549 chunk = rsp.read(1024)
547 550 work.append(chunk)
548 551 merged = ''.join(work)
549 552 while ';' in merged:
550 553 one, merged = merged.split(';', 1)
551 554 yield unescapearg(one)
552 555 chunk = rsp.read(1024)
553 556 work = [merged, chunk]
554 557 yield unescapearg(''.join(work))
555 558
556 559 def _submitone(self, op, args):
557 560 return self._call(op, **pycompat.strkwargs(args))
558 561
559 562 def debugwireargs(self, one, two, three=None, four=None, five=None):
560 563 # don't pass optional arguments left at their default value
561 564 opts = {}
562 565 if three is not None:
563 566 opts[r'three'] = three
564 567 if four is not None:
565 568 opts[r'four'] = four
566 569 return self._call('debugwireargs', one=one, two=two, **opts)
567 570
568 571 def _call(self, cmd, **args):
569 572 """execute <cmd> on the server
570 573
571 574 The command is expected to return a simple string.
572 575
573 576 returns the server reply as a string."""
574 577 raise NotImplementedError()
575 578
576 579 def _callstream(self, cmd, **args):
577 580 """execute <cmd> on the server
578 581
579 582 The command is expected to return a stream. Note that if the
580 583 command doesn't return a stream, _callstream behaves
581 584 differently for ssh and http peers.
582 585
583 586 returns the server reply as a file like object.
584 587 """
585 588 raise NotImplementedError()
586 589
587 590 def _callcompressable(self, cmd, **args):
588 591 """execute <cmd> on the server
589 592
590 593 The command is expected to return a stream.
591 594
592 595 The stream may have been compressed in some implementations. This
593 596 function takes care of the decompression. This is the only difference
594 597 with _callstream.
595 598
596 599 returns the server reply as a file like object.
597 600 """
598 601 raise NotImplementedError()
599 602
600 603 def _callpush(self, cmd, fp, **args):
601 604 """execute a <cmd> on server
602 605
603 606 The command is expected to be related to a push. Push has a special
604 607 return method.
605 608
606 609 returns the server reply as a (ret, output) tuple. ret is either
607 610 empty (error) or a stringified int.
608 611 """
609 612 raise NotImplementedError()
610 613
611 614 def _calltwowaystream(self, cmd, fp, **args):
612 615 """execute <cmd> on server
613 616
614 617 The command will send a stream to the server and get a stream in reply.
615 618 """
616 619 raise NotImplementedError()
617 620
618 621 def _abort(self, exception):
619 622 """clearly abort the wire protocol connection and raise the exception
620 623 """
621 624 raise NotImplementedError()
General Comments 0
You need to be logged in to leave comments. Login now