##// END OF EJS Templates
add copy_threshold to limit use of zero-copy to sufficiently large messages...
MinRK -
Show More
@@ -49,7 +49,7 b' from IPython.utils.importstring import import_item'
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 from IPython.utils.py3compat import str_to_bytes
50 from IPython.utils.py3compat import str_to_bytes
51 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
51 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
52 DottedObjectName, CUnicode, Dict)
52 DottedObjectName, CUnicode, Dict, Int)
53
53
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55 # utility functions
55 # utility functions
@@ -84,8 +84,9 b' pickle_unpacker = pickle.loads'
84 default_packer = json_packer
84 default_packer = json_packer
85 default_unpacker = json_unpacker
85 default_unpacker = json_unpacker
86
86
87 DELIM=b"<IDS|MSG>"
87 DELIM = b"<IDS|MSG>"
88
88 # singleton dummy tracker, which will always report as done
89 DONE = zmq.MessageTracker()
89
90
90 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
91 # Mixin tools for apps that use Sessions
92 # Mixin tools for apps that use Sessions
@@ -329,6 +330,9 b' class Session(Configurable):'
329 # unpacker is not checked - it is assumed to be
330 # unpacker is not checked - it is assumed to be
330 if not callable(new):
331 if not callable(new):
331 raise TypeError("unpacker must be callable, not %s"%type(new))
332 raise TypeError("unpacker must be callable, not %s"%type(new))
333
334 copy_threshold = Int(2**12, config=True,
335 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
332
336
333 def __init__(self, **kwargs):
337 def __init__(self, **kwargs):
334 """create a Session object
338 """create a Session object
@@ -544,11 +548,6 b' class Session(Configurable):'
544 -------
548 -------
545 msg : dict
549 msg : dict
546 The constructed message.
550 The constructed message.
547 (msg,tracker) : (dict, MessageTracker)
548 if track=True, then a 2-tuple will be returned,
549 the first element being the constructed
550 message, and the second being the MessageTracker
551
552 """
551 """
553
552
554 if not isinstance(stream, (zmq.Socket, ZMQStream)):
553 if not isinstance(stream, (zmq.Socket, ZMQStream)):
@@ -566,25 +565,18 b' class Session(Configurable):'
566
565
567 buffers = [] if buffers is None else buffers
566 buffers = [] if buffers is None else buffers
568 to_send = self.serialize(msg, ident)
567 to_send = self.serialize(msg, ident)
569 flag = 0
568 to_send.extend(buffers)
570 if buffers:
569 longest = max([ len(s) for s in to_send ])
571 flag = zmq.SNDMORE
570 copy = (longest > self.copy_threshold)
572 _track = False
571
572 if buffers and track and not copy:
573 # only really track when we are doing zero-copy buffers
574 tracker = stream.send_multipart(to_send, copy=False, track=True)
573 else:
575 else:
574 _track=track
576 # use dummy tracker, which will be done immediately
575 if track:
577 tracker = DONE
576 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
578 stream.send_multipart(to_send, copy=copy)
577 else:
578 tracker = stream.send_multipart(to_send, flag, copy=False)
579 for b in buffers[:-1]:
580 stream.send(b, flag, copy=False)
581 if buffers:
582 if track:
583 tracker = stream.send(buffers[-1], copy=False, track=track)
584 else:
585 tracker = stream.send(buffers[-1], copy=False)
586
579
587 # omsg = Message(msg)
588 if self.debug:
580 if self.debug:
589 pprint.pprint(msg)
581 pprint.pprint(msg)
590 pprint.pprint(to_send)
582 pprint.pprint(to_send)
General Comments 0
You need to be logged in to leave comments. Login now