Show More
@@ -49,7 +49,7 b' from IPython.utils.importstring import import_item' | |||||
49 | from IPython.utils.jsonutil import extract_dates, squash_dates, date_default |
|
49 | from IPython.utils.jsonutil import extract_dates, squash_dates, date_default | |
50 | from IPython.utils.py3compat import str_to_bytes |
|
50 | from IPython.utils.py3compat import str_to_bytes | |
51 | from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set, |
|
51 | from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set, | |
52 | DottedObjectName, CUnicode, Dict) |
|
52 | DottedObjectName, CUnicode, Dict, Int) | |
53 |
|
53 | |||
54 | #----------------------------------------------------------------------------- |
|
54 | #----------------------------------------------------------------------------- | |
55 | # utility functions |
|
55 | # utility functions | |
@@ -84,8 +84,9 b' pickle_unpacker = pickle.loads' | |||||
84 | default_packer = json_packer |
|
84 | default_packer = json_packer | |
85 | default_unpacker = json_unpacker |
|
85 | default_unpacker = json_unpacker | |
86 |
|
86 | |||
87 | DELIM=b"<IDS|MSG>" |
|
87 | DELIM = b"<IDS|MSG>" | |
88 |
|
88 | # singleton dummy tracker, which will always report as done | ||
|
89 | DONE = zmq.MessageTracker() | |||
89 |
|
90 | |||
90 | #----------------------------------------------------------------------------- |
|
91 | #----------------------------------------------------------------------------- | |
91 | # Mixin tools for apps that use Sessions |
|
92 | # Mixin tools for apps that use Sessions | |
@@ -329,6 +330,9 b' class Session(Configurable):' | |||||
329 | # unpacker is not checked - it is assumed to be |
|
330 | # unpacker is not checked - it is assumed to be | |
330 | if not callable(new): |
|
331 | if not callable(new): | |
331 | raise TypeError("unpacker must be callable, not %s"%type(new)) |
|
332 | raise TypeError("unpacker must be callable, not %s"%type(new)) | |
|
333 | ||||
|
334 | copy_threshold = Int(2**12, config=True, | |||
|
335 | help="Threshold (in bytes) beyond which a buffer should be sent without copying.") | |||
332 |
|
336 | |||
333 | def __init__(self, **kwargs): |
|
337 | def __init__(self, **kwargs): | |
334 | """create a Session object |
|
338 | """create a Session object | |
@@ -544,11 +548,6 b' class Session(Configurable):' | |||||
544 | ------- |
|
548 | ------- | |
545 | msg : dict |
|
549 | msg : dict | |
546 | The constructed message. |
|
550 | The constructed message. | |
547 | (msg,tracker) : (dict, MessageTracker) |
|
|||
548 | if track=True, then a 2-tuple will be returned, |
|
|||
549 | the first element being the constructed |
|
|||
550 | message, and the second being the MessageTracker |
|
|||
551 |
|
||||
552 | """ |
|
551 | """ | |
553 |
|
552 | |||
554 | if not isinstance(stream, (zmq.Socket, ZMQStream)): |
|
553 | if not isinstance(stream, (zmq.Socket, ZMQStream)): | |
@@ -566,25 +565,18 b' class Session(Configurable):' | |||||
566 |
|
565 | |||
567 | buffers = [] if buffers is None else buffers |
|
566 | buffers = [] if buffers is None else buffers | |
568 | to_send = self.serialize(msg, ident) |
|
567 | to_send = self.serialize(msg, ident) | |
569 | flag = 0 |
|
568 | to_send.extend(buffers) | |
570 | if buffers: |
|
569 | longest = max([ len(s) for s in to_send ]) | |
571 | flag = zmq.SNDMORE |
|
570 | copy = (longest > self.copy_threshold) | |
572 | _track = False |
|
571 | ||
|
572 | if buffers and track and not copy: | |||
|
573 | # only really track when we are doing zero-copy buffers | |||
|
574 | tracker = stream.send_multipart(to_send, copy=False, track=True) | |||
573 | else: |
|
575 | else: | |
574 | _track=track |
|
576 | # use dummy tracker, which will be done immediately | |
575 |
|
|
577 | tracker = DONE | |
576 |
|
|
578 | stream.send_multipart(to_send, copy=copy) | |
577 | else: |
|
|||
578 | tracker = stream.send_multipart(to_send, flag, copy=False) |
|
|||
579 | for b in buffers[:-1]: |
|
|||
580 | stream.send(b, flag, copy=False) |
|
|||
581 | if buffers: |
|
|||
582 | if track: |
|
|||
583 | tracker = stream.send(buffers[-1], copy=False, track=track) |
|
|||
584 | else: |
|
|||
585 | tracker = stream.send(buffers[-1], copy=False) |
|
|||
586 |
|
579 | |||
587 | # omsg = Message(msg) |
|
|||
588 | if self.debug: |
|
580 | if self.debug: | |
589 | pprint.pprint(msg) |
|
581 | pprint.pprint(msg) | |
590 | pprint.pprint(to_send) |
|
582 | pprint.pprint(to_send) |
General Comments 0
You need to be logged in to leave comments.
Login now