##// END OF EJS Templates
wireprotov1peer: simplify the way batchable rpcs are defined...
Valentin Gatien-Baron -
r48686:cdad6560 default
parent child Browse files
Show More
@@ -1,670 +1,673 b''
1 1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 2 #
3 3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import sys
11 11 import weakref
12 12
13 13 from .i18n import _
14 14 from .node import bin
15 15 from .pycompat import (
16 16 getattr,
17 17 setattr,
18 18 )
19 19 from . import (
20 20 bundle2,
21 21 changegroup as changegroupmod,
22 22 encoding,
23 23 error,
24 24 pushkey as pushkeymod,
25 25 pycompat,
26 26 util,
27 27 wireprototypes,
28 28 )
29 29 from .interfaces import (
30 30 repository,
31 31 util as interfaceutil,
32 32 )
33 33 from .utils import hashutil
34 34
35 35 urlreq = util.urlreq
36 36
37 37
38 def batchable(f):
38 def batchable_new_style(f):
39 39 """annotation for batchable methods
40 40
41 41 Such methods must implement a coroutine as follows:
42 42
43 43 @batchable
44 44 def sample(self, one, two=None):
45 45 # Build list of encoded arguments suitable for your wire protocol:
46 46 encoded_args = [('one', encode(one),), ('two', encode(two),)]
47 # Create future for injection of encoded result:
48 encoded_res_future = future()
49 # Return encoded arguments and future:
50 yield encoded_args, encoded_res_future
51 # Assuming the future to be filled with the result from the batched
52 # request now. Decode it:
53 yield decode(encoded_res_future.value)
47 # Return it, along with a function that will receive the result
48 # from the batched request.
49 return encoded_args, decode
54 50
55 51 The decorator returns a function which wraps this coroutine as a plain
56 52 method, but adds the original method as an attribute called "batchable",
57 53 which is used by remotebatch to split the call into separate encoding and
58 54 decoding phases.
59 55 """
60 56
61 57 def plain(*args, **opts):
62 batchable = f(*args, **opts)
63 encoded_args_or_res, encoded_res_future = next(batchable)
64 if not encoded_res_future:
58 encoded_args_or_res, decode = f(*args, **opts)
59 if not decode:
65 60 return encoded_args_or_res # a local result in this case
66 61 self = args[0]
67 62 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
68 encoded_res_future.set(self._submitone(cmd, encoded_args_or_res))
69 return next(batchable)
63 encoded_res = self._submitone(cmd, encoded_args_or_res)
64 return decode(encoded_res)
70 65
71 66 setattr(plain, 'batchable', f)
72 67 setattr(plain, '__name__', f.__name__)
73 68 return plain
74 69
75 70
71 def batchable(f):
72 def upgraded(*args, **opts):
73 batchable = f(*args, **opts)
74 encoded_args_or_res, encoded_res_future = next(batchable)
75 if not encoded_res_future:
76 decode = None
77 else:
78
79 def decode(d):
80 encoded_res_future.set(d)
81 return next(batchable)
82
83 return encoded_args_or_res, decode
84
85 setattr(upgraded, '__name__', f.__name__)
86 return batchable_new_style(upgraded)
87
88
76 89 class future(object):
77 90 '''placeholder for a value to be set later'''
78 91
79 92 def set(self, value):
80 93 if util.safehasattr(self, b'value'):
81 94 raise error.RepoError(b"future is already set")
82 95 self.value = value
83 96
84 97
85 98 def encodebatchcmds(req):
86 99 """Return a ``cmds`` argument value for the ``batch`` command."""
87 100 escapearg = wireprototypes.escapebatcharg
88 101
89 102 cmds = []
90 103 for op, argsdict in req:
91 104 # Old servers didn't properly unescape argument names. So prevent
92 105 # the sending of argument names that may not be decoded properly by
93 106 # servers.
94 107 assert all(escapearg(k) == k for k in argsdict)
95 108
96 109 args = b','.join(
97 110 b'%s=%s' % (escapearg(k), escapearg(v))
98 111 for k, v in pycompat.iteritems(argsdict)
99 112 )
100 113 cmds.append(b'%s %s' % (op, args))
101 114
102 115 return b';'.join(cmds)
103 116
104 117
105 118 class unsentfuture(pycompat.futures.Future):
106 119 """A Future variation to represent an unsent command.
107 120
108 121 Because we buffer commands and don't submit them immediately, calling
109 122 ``result()`` on an unsent future could deadlock. Futures for buffered
110 123 commands are represented by this type, which wraps ``result()`` to
111 124 call ``sendcommands()``.
112 125 """
113 126
114 127 def result(self, timeout=None):
115 128 if self.done():
116 129 return pycompat.futures.Future.result(self, timeout)
117 130
118 131 self._peerexecutor.sendcommands()
119 132
120 133 # This looks like it will infinitely recurse. However,
121 134 # sendcommands() should modify __class__. This call serves as a check
122 135 # on that.
123 136 return self.result(timeout)
124 137
125 138
126 139 @interfaceutil.implementer(repository.ipeercommandexecutor)
127 140 class peerexecutor(object):
128 141 def __init__(self, peer):
129 142 self._peer = peer
130 143 self._sent = False
131 144 self._closed = False
132 145 self._calls = []
133 146 self._futures = weakref.WeakSet()
134 147 self._responseexecutor = None
135 148 self._responsef = None
136 149
137 150 def __enter__(self):
138 151 return self
139 152
140 153 def __exit__(self, exctype, excvalee, exctb):
141 154 self.close()
142 155
143 156 def callcommand(self, command, args):
144 157 if self._sent:
145 158 raise error.ProgrammingError(
146 159 b'callcommand() cannot be used after commands are sent'
147 160 )
148 161
149 162 if self._closed:
150 163 raise error.ProgrammingError(
151 164 b'callcommand() cannot be used after close()'
152 165 )
153 166
154 167 # Commands are dispatched through methods on the peer.
155 168 fn = getattr(self._peer, pycompat.sysstr(command), None)
156 169
157 170 if not fn:
158 171 raise error.ProgrammingError(
159 172 b'cannot call command %s: method of same name not available '
160 173 b'on peer' % command
161 174 )
162 175
163 176 # Commands are either batchable or they aren't. If a command
164 177 # isn't batchable, we send it immediately because the executor
165 178 # can no longer accept new commands after a non-batchable command.
166 179 # If a command is batchable, we queue it for later. But we have
167 180 # to account for the case of a non-batchable command arriving after
168 181 # a batchable one and refuse to service it.
169 182
170 183 def addcall():
171 184 f = pycompat.futures.Future()
172 185 self._futures.add(f)
173 186 self._calls.append((command, args, fn, f))
174 187 return f
175 188
176 189 if getattr(fn, 'batchable', False):
177 190 f = addcall()
178 191
179 192 # But since we don't issue it immediately, we wrap its result()
180 193 # to trigger sending so we avoid deadlocks.
181 194 f.__class__ = unsentfuture
182 195 f._peerexecutor = self
183 196 else:
184 197 if self._calls:
185 198 raise error.ProgrammingError(
186 199 b'%s is not batchable and cannot be called on a command '
187 200 b'executor along with other commands' % command
188 201 )
189 202
190 203 f = addcall()
191 204
192 205 # Non-batchable commands can never coexist with another command
193 206 # in this executor. So send the command immediately.
194 207 self.sendcommands()
195 208
196 209 return f
197 210
198 211 def sendcommands(self):
199 212 if self._sent:
200 213 return
201 214
202 215 if not self._calls:
203 216 return
204 217
205 218 self._sent = True
206 219
207 220 # Unhack any future types so caller seens a clean type and to break
208 221 # cycle between us and futures.
209 222 for f in self._futures:
210 223 if isinstance(f, unsentfuture):
211 224 f.__class__ = pycompat.futures.Future
212 225 f._peerexecutor = None
213 226
214 227 calls = self._calls
215 228 # Mainly to destroy references to futures.
216 229 self._calls = None
217 230
218 231 # Simple case of a single command. We call it synchronously.
219 232 if len(calls) == 1:
220 233 command, args, fn, f = calls[0]
221 234
222 235 # Future was cancelled. Ignore it.
223 236 if not f.set_running_or_notify_cancel():
224 237 return
225 238
226 239 try:
227 240 result = fn(**pycompat.strkwargs(args))
228 241 except Exception:
229 242 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
230 243 else:
231 244 f.set_result(result)
232 245
233 246 return
234 247
235 248 # Batch commands are a bit harder. First, we have to deal with the
236 249 # @batchable coroutine. That's a bit annoying. Furthermore, we also
237 250 # need to preserve streaming. i.e. it should be possible for the
238 251 # futures to resolve as data is coming in off the wire without having
239 252 # to wait for the final byte of the final response. We do this by
240 253 # spinning up a thread to read the responses.
241 254
242 255 requests = []
243 256 states = []
244 257
245 258 for command, args, fn, f in calls:
246 259 # Future was cancelled. Ignore it.
247 260 if not f.set_running_or_notify_cancel():
248 261 continue
249 262
250 263 try:
251 batchable = fn.batchable(
264 encoded_args_or_res, decode = fn.batchable(
252 265 fn.__self__, **pycompat.strkwargs(args)
253 266 )
254 267 except Exception:
255 268 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
256 269 return
257 270
258 # Encoded arguments and future holding remote result.
259 try:
260 encoded_args_or_res, fremote = next(batchable)
261 except Exception:
262 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
263 return
264
265 if not fremote:
271 if not decode:
266 272 f.set_result(encoded_args_or_res)
267 273 else:
268 274 requests.append((command, encoded_args_or_res))
269 states.append((command, f, batchable, fremote))
275 states.append((command, f, batchable, decode))
270 276
271 277 if not requests:
272 278 return
273 279
274 280 # This will emit responses in order they were executed.
275 281 wireresults = self._peer._submitbatch(requests)
276 282
277 283 # The use of a thread pool executor here is a bit weird for something
278 284 # that only spins up a single thread. However, thread management is
279 285 # hard and it is easy to encounter race conditions, deadlocks, etc.
280 286 # concurrent.futures already solves these problems and its thread pool
281 287 # executor has minimal overhead. So we use it.
282 288 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
283 289 self._responsef = self._responseexecutor.submit(
284 290 self._readbatchresponse, states, wireresults
285 291 )
286 292
287 293 def close(self):
288 294 self.sendcommands()
289 295
290 296 if self._closed:
291 297 return
292 298
293 299 self._closed = True
294 300
295 301 if not self._responsef:
296 302 return
297 303
298 304 # We need to wait on our in-flight response and then shut down the
299 305 # executor once we have a result.
300 306 try:
301 307 self._responsef.result()
302 308 finally:
303 309 self._responseexecutor.shutdown(wait=True)
304 310 self._responsef = None
305 311 self._responseexecutor = None
306 312
307 313 # If any of our futures are still in progress, mark them as
308 314 # errored. Otherwise a result() could wait indefinitely.
309 315 for f in self._futures:
310 316 if not f.done():
311 317 f.set_exception(
312 318 error.ResponseError(
313 319 _(b'unfulfilled batch command response'), None
314 320 )
315 321 )
316 322
317 323 self._futures = None
318 324
319 325 def _readbatchresponse(self, states, wireresults):
320 326 # Executes in a thread to read data off the wire.
321 327
322 for command, f, batchable, fremote in states:
328 for command, f, batchable, decode in states:
323 329 # Grab raw result off the wire and teach the internal future
324 330 # about it.
325 331 try:
326 332 remoteresult = next(wireresults)
327 333 except StopIteration:
328 334 # This can happen in particular because next(batchable)
329 335 # in the previous iteration can call peer._abort, which
330 336 # may close the peer.
331 337 f.set_exception(
332 338 error.ResponseError(
333 339 _(b'unfulfilled batch command response'), None
334 340 )
335 341 )
336 342 else:
337 fremote.set(remoteresult)
338
339 # And ask the coroutine to decode that value.
340 343 try:
341 result = next(batchable)
344 result = decode(remoteresult)
342 345 except Exception:
343 346 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
344 347 else:
345 348 f.set_result(result)
346 349
347 350
348 351 @interfaceutil.implementer(
349 352 repository.ipeercommands, repository.ipeerlegacycommands
350 353 )
351 354 class wirepeer(repository.peer):
352 355 """Client-side interface for communicating with a peer repository.
353 356
354 357 Methods commonly call wire protocol commands of the same name.
355 358
356 359 See also httppeer.py and sshpeer.py for protocol-specific
357 360 implementations of this interface.
358 361 """
359 362
360 363 def commandexecutor(self):
361 364 return peerexecutor(self)
362 365
363 366 # Begin of ipeercommands interface.
364 367
365 368 def clonebundles(self):
366 369 self.requirecap(b'clonebundles', _(b'clone bundles'))
367 370 return self._call(b'clonebundles')
368 371
369 372 @batchable
370 373 def lookup(self, key):
371 374 self.requirecap(b'lookup', _(b'look up remote revision'))
372 375 f = future()
373 376 yield {b'key': encoding.fromlocal(key)}, f
374 377 d = f.value
375 378 success, data = d[:-1].split(b" ", 1)
376 379 if int(success):
377 380 yield bin(data)
378 381 else:
379 382 self._abort(error.RepoError(data))
380 383
381 384 @batchable
382 385 def heads(self):
383 386 f = future()
384 387 yield {}, f
385 388 d = f.value
386 389 try:
387 390 yield wireprototypes.decodelist(d[:-1])
388 391 except ValueError:
389 392 self._abort(error.ResponseError(_(b"unexpected response:"), d))
390 393
391 394 @batchable
392 395 def known(self, nodes):
393 396 f = future()
394 397 yield {b'nodes': wireprototypes.encodelist(nodes)}, f
395 398 d = f.value
396 399 try:
397 400 yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
398 401 except ValueError:
399 402 self._abort(error.ResponseError(_(b"unexpected response:"), d))
400 403
401 404 @batchable
402 405 def branchmap(self):
403 406 f = future()
404 407 yield {}, f
405 408 d = f.value
406 409 try:
407 410 branchmap = {}
408 411 for branchpart in d.splitlines():
409 412 branchname, branchheads = branchpart.split(b' ', 1)
410 413 branchname = encoding.tolocal(urlreq.unquote(branchname))
411 414 branchheads = wireprototypes.decodelist(branchheads)
412 415 branchmap[branchname] = branchheads
413 416 yield branchmap
414 417 except TypeError:
415 418 self._abort(error.ResponseError(_(b"unexpected response:"), d))
416 419
417 420 @batchable
418 421 def listkeys(self, namespace):
419 422 if not self.capable(b'pushkey'):
420 423 yield {}, None
421 424 f = future()
422 425 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
423 426 yield {b'namespace': encoding.fromlocal(namespace)}, f
424 427 d = f.value
425 428 self.ui.debug(
426 429 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
427 430 )
428 431 yield pushkeymod.decodekeys(d)
429 432
430 433 @batchable
431 434 def pushkey(self, namespace, key, old, new):
432 435 if not self.capable(b'pushkey'):
433 436 yield False, None
434 437 f = future()
435 438 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
436 439 yield {
437 440 b'namespace': encoding.fromlocal(namespace),
438 441 b'key': encoding.fromlocal(key),
439 442 b'old': encoding.fromlocal(old),
440 443 b'new': encoding.fromlocal(new),
441 444 }, f
442 445 d = f.value
443 446 d, output = d.split(b'\n', 1)
444 447 try:
445 448 d = bool(int(d))
446 449 except ValueError:
447 450 raise error.ResponseError(
448 451 _(b'push failed (unexpected response):'), d
449 452 )
450 453 for l in output.splitlines(True):
451 454 self.ui.status(_(b'remote: '), l)
452 455 yield d
453 456
454 457 def stream_out(self):
455 458 return self._callstream(b'stream_out')
456 459
457 460 def getbundle(self, source, **kwargs):
458 461 kwargs = pycompat.byteskwargs(kwargs)
459 462 self.requirecap(b'getbundle', _(b'look up remote changes'))
460 463 opts = {}
461 464 bundlecaps = kwargs.get(b'bundlecaps') or set()
462 465 for key, value in pycompat.iteritems(kwargs):
463 466 if value is None:
464 467 continue
465 468 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
466 469 if keytype is None:
467 470 raise error.ProgrammingError(
468 471 b'Unexpectedly None keytype for key %s' % key
469 472 )
470 473 elif keytype == b'nodes':
471 474 value = wireprototypes.encodelist(value)
472 475 elif keytype == b'csv':
473 476 value = b','.join(value)
474 477 elif keytype == b'scsv':
475 478 value = b','.join(sorted(value))
476 479 elif keytype == b'boolean':
477 480 value = b'%i' % bool(value)
478 481 elif keytype != b'plain':
479 482 raise KeyError(b'unknown getbundle option type %s' % keytype)
480 483 opts[key] = value
481 484 f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
482 485 if any((cap.startswith(b'HG2') for cap in bundlecaps)):
483 486 return bundle2.getunbundler(self.ui, f)
484 487 else:
485 488 return changegroupmod.cg1unpacker(f, b'UN')
486 489
487 490 def unbundle(self, bundle, heads, url):
488 491 """Send cg (a readable file-like object representing the
489 492 changegroup to push, typically a chunkbuffer object) to the
490 493 remote server as a bundle.
491 494
492 495 When pushing a bundle10 stream, return an integer indicating the
493 496 result of the push (see changegroup.apply()).
494 497
495 498 When pushing a bundle20 stream, return a bundle20 stream.
496 499
497 500 `url` is the url the client thinks it's pushing to, which is
498 501 visible to hooks.
499 502 """
500 503
501 504 if heads != [b'force'] and self.capable(b'unbundlehash'):
502 505 heads = wireprototypes.encodelist(
503 506 [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
504 507 )
505 508 else:
506 509 heads = wireprototypes.encodelist(heads)
507 510
508 511 if util.safehasattr(bundle, b'deltaheader'):
509 512 # this a bundle10, do the old style call sequence
510 513 ret, output = self._callpush(b"unbundle", bundle, heads=heads)
511 514 if ret == b"":
512 515 raise error.ResponseError(_(b'push failed:'), output)
513 516 try:
514 517 ret = int(ret)
515 518 except ValueError:
516 519 raise error.ResponseError(
517 520 _(b'push failed (unexpected response):'), ret
518 521 )
519 522
520 523 for l in output.splitlines(True):
521 524 self.ui.status(_(b'remote: '), l)
522 525 else:
523 526 # bundle2 push. Send a stream, fetch a stream.
524 527 stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
525 528 ret = bundle2.getunbundler(self.ui, stream)
526 529 return ret
527 530
528 531 # End of ipeercommands interface.
529 532
530 533 # Begin of ipeerlegacycommands interface.
531 534
532 535 def branches(self, nodes):
533 536 n = wireprototypes.encodelist(nodes)
534 537 d = self._call(b"branches", nodes=n)
535 538 try:
536 539 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
537 540 return br
538 541 except ValueError:
539 542 self._abort(error.ResponseError(_(b"unexpected response:"), d))
540 543
541 544 def between(self, pairs):
542 545 batch = 8 # avoid giant requests
543 546 r = []
544 547 for i in pycompat.xrange(0, len(pairs), batch):
545 548 n = b" ".join(
546 549 [
547 550 wireprototypes.encodelist(p, b'-')
548 551 for p in pairs[i : i + batch]
549 552 ]
550 553 )
551 554 d = self._call(b"between", pairs=n)
552 555 try:
553 556 r.extend(
554 557 l and wireprototypes.decodelist(l) or []
555 558 for l in d.splitlines()
556 559 )
557 560 except ValueError:
558 561 self._abort(error.ResponseError(_(b"unexpected response:"), d))
559 562 return r
560 563
561 564 def changegroup(self, nodes, source):
562 565 n = wireprototypes.encodelist(nodes)
563 566 f = self._callcompressable(b"changegroup", roots=n)
564 567 return changegroupmod.cg1unpacker(f, b'UN')
565 568
566 569 def changegroupsubset(self, bases, heads, source):
567 570 self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
568 571 bases = wireprototypes.encodelist(bases)
569 572 heads = wireprototypes.encodelist(heads)
570 573 f = self._callcompressable(
571 574 b"changegroupsubset", bases=bases, heads=heads
572 575 )
573 576 return changegroupmod.cg1unpacker(f, b'UN')
574 577
575 578 # End of ipeerlegacycommands interface.
576 579
577 580 def _submitbatch(self, req):
578 581 """run batch request <req> on the server
579 582
580 583 Returns an iterator of the raw responses from the server.
581 584 """
582 585 ui = self.ui
583 586 if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
584 587 ui.debug(b'devel-peer-request: batched-content\n')
585 588 for op, args in req:
586 589 msg = b'devel-peer-request: - %s (%d arguments)\n'
587 590 ui.debug(msg % (op, len(args)))
588 591
589 592 unescapearg = wireprototypes.unescapebatcharg
590 593
591 594 rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
592 595 chunk = rsp.read(1024)
593 596 work = [chunk]
594 597 while chunk:
595 598 while b';' not in chunk and chunk:
596 599 chunk = rsp.read(1024)
597 600 work.append(chunk)
598 601 merged = b''.join(work)
599 602 while b';' in merged:
600 603 one, merged = merged.split(b';', 1)
601 604 yield unescapearg(one)
602 605 chunk = rsp.read(1024)
603 606 work = [merged, chunk]
604 607 yield unescapearg(b''.join(work))
605 608
606 609 def _submitone(self, op, args):
607 610 return self._call(op, **pycompat.strkwargs(args))
608 611
609 612 def debugwireargs(self, one, two, three=None, four=None, five=None):
610 613 # don't pass optional arguments left at their default value
611 614 opts = {}
612 615 if three is not None:
613 616 opts['three'] = three
614 617 if four is not None:
615 618 opts['four'] = four
616 619 return self._call(b'debugwireargs', one=one, two=two, **opts)
617 620
618 621 def _call(self, cmd, **args):
619 622 """execute <cmd> on the server
620 623
621 624 The command is expected to return a simple string.
622 625
623 626 returns the server reply as a string."""
624 627 raise NotImplementedError()
625 628
626 629 def _callstream(self, cmd, **args):
627 630 """execute <cmd> on the server
628 631
629 632 The command is expected to return a stream. Note that if the
630 633 command doesn't return a stream, _callstream behaves
631 634 differently for ssh and http peers.
632 635
633 636 returns the server reply as a file like object.
634 637 """
635 638 raise NotImplementedError()
636 639
637 640 def _callcompressable(self, cmd, **args):
638 641 """execute <cmd> on the server
639 642
640 643 The command is expected to return a stream.
641 644
642 645 The stream may have been compressed in some implementations. This
643 646 function takes care of the decompression. This is the only difference
644 647 with _callstream.
645 648
646 649 returns the server reply as a file like object.
647 650 """
648 651 raise NotImplementedError()
649 652
650 653 def _callpush(self, cmd, fp, **args):
651 654 """execute a <cmd> on server
652 655
653 656 The command is expected to be related to a push. Push has a special
654 657 return method.
655 658
656 659 returns the server reply as a (ret, output) tuple. ret is either
657 660 empty (error) or a stringified int.
658 661 """
659 662 raise NotImplementedError()
660 663
661 664 def _calltwowaystream(self, cmd, fp, **args):
662 665 """execute <cmd> on server
663 666
664 667 The command will send a stream to the server and get a stream in reply.
665 668 """
666 669 raise NotImplementedError()
667 670
668 671 def _abort(self, exception):
669 672 """clearly abort the wire protocol connection and raise the exception"""
670 673 raise NotImplementedError()
General Comments 0
You need to be logged in to leave comments. Login now