##// END OF EJS Templates
remove module-level logger in zmq.iostream...
MinRK -
Show More
@@ -1,93 +1,88 b''
1 import logging
2 import sys
1 import sys
3 import time
2 import time
4 from io import StringIO
3 from io import StringIO
5
4
6 from session import extract_header, Message
5 from session import extract_header, Message
7
6
8 from IPython.utils import io
7 from IPython.utils import io
9
8
10 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
11 # Globals
10 # Globals
12 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
13
12
14 # Module-level logger
15 logger = logging.getLogger(__name__)
16
17 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
18 # Stream classes
14 # Stream classes
19 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
20
16
21 class OutStream(object):
17 class OutStream(object):
22 """A file like object that publishes the stream to a 0MQ PUB socket."""
18 """A file like object that publishes the stream to a 0MQ PUB socket."""
23
19
24 # The time interval between automatic flushes, in seconds.
20 # The time interval between automatic flushes, in seconds.
25 flush_interval = 0.05
21 flush_interval = 0.05
26 topic=None
22 topic=None
27
23
28 def __init__(self, session, pub_socket, name):
24 def __init__(self, session, pub_socket, name):
29 self.session = session
25 self.session = session
30 self.pub_socket = pub_socket
26 self.pub_socket = pub_socket
31 self.name = name
27 self.name = name
32 self.parent_header = {}
28 self.parent_header = {}
33 self._new_buffer()
29 self._new_buffer()
34
30
35 def set_parent(self, parent):
31 def set_parent(self, parent):
36 self.parent_header = extract_header(parent)
32 self.parent_header = extract_header(parent)
37
33
38 def close(self):
34 def close(self):
39 self.pub_socket = None
35 self.pub_socket = None
40
36
41 def flush(self):
37 def flush(self):
42 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
38 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
43 if self.pub_socket is None:
39 if self.pub_socket is None:
44 raise ValueError(u'I/O operation on closed file')
40 raise ValueError(u'I/O operation on closed file')
45 else:
41 else:
46 data = self._buffer.getvalue()
42 data = self._buffer.getvalue()
47 if data:
43 if data:
48 content = {u'name':self.name, u'data':data}
44 content = {u'name':self.name, u'data':data}
49 msg = self.session.send(self.pub_socket, u'stream', content=content,
45 msg = self.session.send(self.pub_socket, u'stream', content=content,
50 parent=self.parent_header, ident=self.topic)
46 parent=self.parent_header, ident=self.topic)
51 logger.debug(msg)
52
47
53 self._buffer.close()
48 self._buffer.close()
54 self._new_buffer()
49 self._new_buffer()
55
50
56 def isatty(self):
51 def isatty(self):
57 return False
52 return False
58
53
59 def next(self):
54 def next(self):
60 raise IOError('Read not supported on a write only stream.')
55 raise IOError('Read not supported on a write only stream.')
61
56
62 def read(self, size=-1):
57 def read(self, size=-1):
63 raise IOError('Read not supported on a write only stream.')
58 raise IOError('Read not supported on a write only stream.')
64
59
65 def readline(self, size=-1):
60 def readline(self, size=-1):
66 raise IOError('Read not supported on a write only stream.')
61 raise IOError('Read not supported on a write only stream.')
67
62
68 def write(self, string):
63 def write(self, string):
69 if self.pub_socket is None:
64 if self.pub_socket is None:
70 raise ValueError('I/O operation on closed file')
65 raise ValueError('I/O operation on closed file')
71 else:
66 else:
72 # Make sure that we're handling unicode
67 # Make sure that we're handling unicode
73 if not isinstance(string, unicode):
68 if not isinstance(string, unicode):
74 enc = sys.stdin.encoding or sys.getdefaultencoding()
69 enc = sys.stdin.encoding or sys.getdefaultencoding()
75 string = string.decode(enc, 'replace')
70 string = string.decode(enc, 'replace')
76
71
77 self._buffer.write(string)
72 self._buffer.write(string)
78 current_time = time.time()
73 current_time = time.time()
79 if self._start <= 0:
74 if self._start <= 0:
80 self._start = current_time
75 self._start = current_time
81 elif current_time - self._start > self.flush_interval:
76 elif current_time - self._start > self.flush_interval:
82 self.flush()
77 self.flush()
83
78
84 def writelines(self, sequence):
79 def writelines(self, sequence):
85 if self.pub_socket is None:
80 if self.pub_socket is None:
86 raise ValueError('I/O operation on closed file')
81 raise ValueError('I/O operation on closed file')
87 else:
82 else:
88 for string in sequence:
83 for string in sequence:
89 self.write(string)
84 self.write(string)
90
85
91 def _new_buffer(self):
86 def _new_buffer(self):
92 self._buffer = StringIO()
87 self._buffer = StringIO()
93 self._start = -1
88 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now