##// END OF EJS Templates
Merge pull request #2500 from bfroehle/outstream_encoding...
Bradley M. Froehle -
r8597:2e1736cc merge
parent child Browse files
Show More
@@ -1,95 +1,95 b''
1 1 import sys
2 2 import time
3 3 from io import StringIO
4 4
5 5 from session import extract_header, Message
6 6
7 from IPython.utils import io, text, encoding
7 from IPython.utils import io, text
8 8 from IPython.utils import py3compat
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Globals
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Stream classes
16 16 #-----------------------------------------------------------------------------
17 17
18 18 class OutStream(object):
19 19 """A file like object that publishes the stream to a 0MQ PUB socket."""
20 20
21 21 # The time interval between automatic flushes, in seconds.
22 22 flush_interval = 0.05
23 23 topic=None
24 24
25 25 def __init__(self, session, pub_socket, name):
26 self.encoding = 'UTF-8'
26 27 self.session = session
27 28 self.pub_socket = pub_socket
28 29 self.name = name
29 30 self.parent_header = {}
30 31 self._new_buffer()
31 32
32 33 def set_parent(self, parent):
33 34 self.parent_header = extract_header(parent)
34 35
35 36 def close(self):
36 37 self.pub_socket = None
37 38
38 39 def flush(self):
39 40 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
40 41 if self.pub_socket is None:
41 42 raise ValueError(u'I/O operation on closed file')
42 43 else:
43 44 data = self._buffer.getvalue()
44 45 if data:
45 46 content = {u'name':self.name, u'data':data}
46 47 msg = self.session.send(self.pub_socket, u'stream', content=content,
47 48 parent=self.parent_header, ident=self.topic)
48 49
49 50 if hasattr(self.pub_socket, 'flush'):
50 51 # socket itself has flush (presumably ZMQStream)
51 52 self.pub_socket.flush()
52 53 self._buffer.close()
53 54 self._new_buffer()
54 55
55 56 def isatty(self):
56 57 return False
57 58
58 59 def __next__(self):
59 60 raise IOError('Read not supported on a write only stream.')
60 61
61 62 if not py3compat.PY3:
62 63 next = __next__
63 64
64 65 def read(self, size=-1):
65 66 raise IOError('Read not supported on a write only stream.')
66 67
67 68 def readline(self, size=-1):
68 69 raise IOError('Read not supported on a write only stream.')
69 70
70 71 def write(self, string):
71 72 if self.pub_socket is None:
72 73 raise ValueError('I/O operation on closed file')
73 74 else:
74 75 # Make sure that we're handling unicode
75 76 if not isinstance(string, unicode):
76 enc = encoding.DEFAULT_ENCODING
77 string = string.decode(enc, 'replace')
78
77 string = string.decode(self.encoding, 'replace')
78
79 79 self._buffer.write(string)
80 80 current_time = time.time()
81 81 if self._start <= 0:
82 82 self._start = current_time
83 83 elif current_time - self._start > self.flush_interval:
84 84 self.flush()
85 85
86 86 def writelines(self, sequence):
87 87 if self.pub_socket is None:
88 88 raise ValueError('I/O operation on closed file')
89 89 else:
90 90 for string in sequence:
91 91 self.write(string)
92 92
93 93 def _new_buffer(self):
94 94 self._buffer = StringIO()
95 95 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now