##// END OF EJS Templates
flush pub_socket in OutStream.flush...
MinRK -
Show More
@@ -1,88 +1,91 b''
1 1 import sys
2 2 import time
3 3 from io import StringIO
4 4
5 5 from session import extract_header, Message
6 6
7 7 from IPython.utils import io
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Globals
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Stream classes
15 15 #-----------------------------------------------------------------------------
16 16
17 17 class OutStream(object):
18 18 """A file like object that publishes the stream to a 0MQ PUB socket."""
19 19
20 20 # The time interval between automatic flushes, in seconds.
21 21 flush_interval = 0.05
22 22 topic=None
23 23
24 24 def __init__(self, session, pub_socket, name):
25 25 self.session = session
26 26 self.pub_socket = pub_socket
27 27 self.name = name
28 28 self.parent_header = {}
29 29 self._new_buffer()
30 30
31 31 def set_parent(self, parent):
32 32 self.parent_header = extract_header(parent)
33 33
34 34 def close(self):
35 35 self.pub_socket = None
36 36
37 37 def flush(self):
38 38 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
39 39 if self.pub_socket is None:
40 40 raise ValueError(u'I/O operation on closed file')
41 41 else:
42 42 data = self._buffer.getvalue()
43 43 if data:
44 44 content = {u'name':self.name, u'data':data}
45 45 msg = self.session.send(self.pub_socket, u'stream', content=content,
46 46 parent=self.parent_header, ident=self.topic)
47 47
48 if hasattr(self.pub_socket, 'flush'):
49 # socket itself has flush (presumably ZMQStream)
50 self.pub_socket.flush()
48 51 self._buffer.close()
49 52 self._new_buffer()
50 53
51 54 def isatty(self):
52 55 return False
53 56
54 57 def next(self):
55 58 raise IOError('Read not supported on a write only stream.')
56 59
57 60 def read(self, size=-1):
58 61 raise IOError('Read not supported on a write only stream.')
59 62
60 63 def readline(self, size=-1):
61 64 raise IOError('Read not supported on a write only stream.')
62 65
63 66 def write(self, string):
64 67 if self.pub_socket is None:
65 68 raise ValueError('I/O operation on closed file')
66 69 else:
67 70 # Make sure that we're handling unicode
68 71 if not isinstance(string, unicode):
69 72 enc = sys.stdin.encoding or sys.getdefaultencoding()
70 73 string = string.decode(enc, 'replace')
71 74
72 75 self._buffer.write(string)
73 76 current_time = time.time()
74 77 if self._start <= 0:
75 78 self._start = current_time
76 79 elif current_time - self._start > self.flush_interval:
77 80 self.flush()
78 81
79 82 def writelines(self, sequence):
80 83 if self.pub_socket is None:
81 84 raise ValueError('I/O operation on closed file')
82 85 else:
83 86 for string in sequence:
84 87 self.write(string)
85 88
86 89 def _new_buffer(self):
87 90 self._buffer = StringIO()
88 91 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now