##// END OF EJS Templates
Backport PR #2500: Add `encoding` attribute to `OutStream` class....
MinRK -
Show More
@@ -1,91 +1,91 b''
1 import sys
1 import sys
2 import time
2 import time
3 from io import StringIO
3 from io import StringIO
4
4
5 from session import extract_header, Message
5 from session import extract_header, Message
6
6
7 from IPython.utils import io, text, encoding
7 from IPython.utils import io, text
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Globals
10 # Globals
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Stream classes
14 # Stream classes
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 class OutStream(object):
17 class OutStream(object):
18 """A file like object that publishes the stream to a 0MQ PUB socket."""
18 """A file like object that publishes the stream to a 0MQ PUB socket."""
19
19
20 # The time interval between automatic flushes, in seconds.
20 # The time interval between automatic flushes, in seconds.
21 flush_interval = 0.05
21 flush_interval = 0.05
22 topic=None
22 topic=None
23
23
24 def __init__(self, session, pub_socket, name):
24 def __init__(self, session, pub_socket, name):
25 self.encoding = 'UTF-8'
25 self.session = session
26 self.session = session
26 self.pub_socket = pub_socket
27 self.pub_socket = pub_socket
27 self.name = name
28 self.name = name
28 self.parent_header = {}
29 self.parent_header = {}
29 self._new_buffer()
30 self._new_buffer()
30
31
31 def set_parent(self, parent):
32 def set_parent(self, parent):
32 self.parent_header = extract_header(parent)
33 self.parent_header = extract_header(parent)
33
34
34 def close(self):
35 def close(self):
35 self.pub_socket = None
36 self.pub_socket = None
36
37
37 def flush(self):
38 def flush(self):
38 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
39 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
39 if self.pub_socket is None:
40 if self.pub_socket is None:
40 raise ValueError(u'I/O operation on closed file')
41 raise ValueError(u'I/O operation on closed file')
41 else:
42 else:
42 data = self._buffer.getvalue()
43 data = self._buffer.getvalue()
43 if data:
44 if data:
44 content = {u'name':self.name, u'data':data}
45 content = {u'name':self.name, u'data':data}
45 msg = self.session.send(self.pub_socket, u'stream', content=content,
46 msg = self.session.send(self.pub_socket, u'stream', content=content,
46 parent=self.parent_header, ident=self.topic)
47 parent=self.parent_header, ident=self.topic)
47
48
48 if hasattr(self.pub_socket, 'flush'):
49 if hasattr(self.pub_socket, 'flush'):
49 # socket itself has flush (presumably ZMQStream)
50 # socket itself has flush (presumably ZMQStream)
50 self.pub_socket.flush()
51 self.pub_socket.flush()
51 self._buffer.close()
52 self._buffer.close()
52 self._new_buffer()
53 self._new_buffer()
53
54
54 def isatty(self):
55 def isatty(self):
55 return False
56 return False
56
57
57 def next(self):
58 def next(self):
58 raise IOError('Read not supported on a write only stream.')
59 raise IOError('Read not supported on a write only stream.')
59
60
60 def read(self, size=-1):
61 def read(self, size=-1):
61 raise IOError('Read not supported on a write only stream.')
62 raise IOError('Read not supported on a write only stream.')
62
63
63 def readline(self, size=-1):
64 def readline(self, size=-1):
64 raise IOError('Read not supported on a write only stream.')
65 raise IOError('Read not supported on a write only stream.')
65
66
66 def write(self, string):
67 def write(self, string):
67 if self.pub_socket is None:
68 if self.pub_socket is None:
68 raise ValueError('I/O operation on closed file')
69 raise ValueError('I/O operation on closed file')
69 else:
70 else:
70 # Make sure that we're handling unicode
71 # Make sure that we're handling unicode
71 if not isinstance(string, unicode):
72 if not isinstance(string, unicode):
72 enc = encoding.DEFAULT_ENCODING
73 string = string.decode(self.encoding, 'replace')
73 string = string.decode(enc, 'replace')
74
74
75 self._buffer.write(string)
75 self._buffer.write(string)
76 current_time = time.time()
76 current_time = time.time()
77 if self._start <= 0:
77 if self._start <= 0:
78 self._start = current_time
78 self._start = current_time
79 elif current_time - self._start > self.flush_interval:
79 elif current_time - self._start > self.flush_interval:
80 self.flush()
80 self.flush()
81
81
82 def writelines(self, sequence):
82 def writelines(self, sequence):
83 if self.pub_socket is None:
83 if self.pub_socket is None:
84 raise ValueError('I/O operation on closed file')
84 raise ValueError('I/O operation on closed file')
85 else:
85 else:
86 for string in sequence:
86 for string in sequence:
87 self.write(string)
87 self.write(string)
88
88
89 def _new_buffer(self):
89 def _new_buffer(self):
90 self._buffer = StringIO()
90 self._buffer = StringIO()
91 self._start = -1
91 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now