##// END OF EJS Templates
Add a metadata attribute to zeromq stdout/stderr stream messages
Jason Grout -
Show More
@@ -1,95 +1,102 b''
1 import sys
1 import sys
2 import time
2 import time
3 from io import StringIO
3 from io import StringIO
4
4
5 from session import extract_header, Message
5 from session import extract_header, Message
6
6
7 from IPython.utils import io, text, encoding
7 from IPython.utils import io, text, encoding
8 from IPython.utils import py3compat
8 from IPython.utils import py3compat
9
9
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Globals
11 # Globals
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Stream classes
15 # Stream classes
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 class OutStream(object):
18 class OutStream(object):
19 """A file like object that publishes the stream to a 0MQ PUB socket."""
19 """A file like object that publishes the stream to a 0MQ PUB socket."""
20
20
21 # The time interval between automatic flushes, in seconds.
21 # The time interval between automatic flushes, in seconds.
22 flush_interval = 0.05
22 flush_interval = 0.05
23 topic=None
23 topic=None
24
24
25 def __init__(self, session, pub_socket, name):
25 def __init__(self, session, pub_socket, name):
26 self.session = session
26 self.session = session
27 self.pub_socket = pub_socket
27 self.pub_socket = pub_socket
28 self.name = name
28 self.name = name
29 self.parent_header = {}
29 self.parent_header = {}
30 self._new_buffer()
30 self._new_buffer()
31 self.metadata = {}
31
32
32 def set_parent(self, parent):
33 def set_parent(self, parent):
33 self.parent_header = extract_header(parent)
34 self.parent_header = extract_header(parent)
34
35
36 def set_metadata(self, metadata):
37 self.flush()
38 self.metadata = metadata
39
35 def close(self):
40 def close(self):
36 self.pub_socket = None
41 self.pub_socket = None
37
42
38 def flush(self):
43 def flush(self):
39 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
44 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
40 if self.pub_socket is None:
45 if self.pub_socket is None:
41 raise ValueError(u'I/O operation on closed file')
46 raise ValueError(u'I/O operation on closed file')
42 else:
47 else:
43 data = self._buffer.getvalue()
48 data = self._buffer.getvalue()
44 if data:
49 if data:
45 content = {u'name':self.name, u'data':data}
50 content = {u'name':self.name, u'data':data}
51 if self.metadata:
52 content['metadata'] = self.metadata
46 msg = self.session.send(self.pub_socket, u'stream', content=content,
53 msg = self.session.send(self.pub_socket, u'stream', content=content,
47 parent=self.parent_header, ident=self.topic)
54 parent=self.parent_header, ident=self.topic)
48
55
49 if hasattr(self.pub_socket, 'flush'):
56 if hasattr(self.pub_socket, 'flush'):
50 # socket itself has flush (presumably ZMQStream)
57 # socket itself has flush (presumably ZMQStream)
51 self.pub_socket.flush()
58 self.pub_socket.flush()
52 self._buffer.close()
59 self._buffer.close()
53 self._new_buffer()
60 self._new_buffer()
54
61
55 def isatty(self):
62 def isatty(self):
56 return False
63 return False
57
64
58 def __next__(self):
65 def __next__(self):
59 raise IOError('Read not supported on a write only stream.')
66 raise IOError('Read not supported on a write only stream.')
60
67
61 if not py3compat.PY3:
68 if not py3compat.PY3:
62 next = __next__
69 next = __next__
63
70
64 def read(self, size=-1):
71 def read(self, size=-1):
65 raise IOError('Read not supported on a write only stream.')
72 raise IOError('Read not supported on a write only stream.')
66
73
67 def readline(self, size=-1):
74 def readline(self, size=-1):
68 raise IOError('Read not supported on a write only stream.')
75 raise IOError('Read not supported on a write only stream.')
69
76
70 def write(self, string):
77 def write(self, string):
71 if self.pub_socket is None:
78 if self.pub_socket is None:
72 raise ValueError('I/O operation on closed file')
79 raise ValueError('I/O operation on closed file')
73 else:
80 else:
74 # Make sure that we're handling unicode
81 # Make sure that we're handling unicode
75 if not isinstance(string, unicode):
82 if not isinstance(string, unicode):
76 enc = encoding.DEFAULT_ENCODING
83 enc = encoding.DEFAULT_ENCODING
77 string = string.decode(enc, 'replace')
84 string = string.decode(enc, 'replace')
78
85
79 self._buffer.write(string)
86 self._buffer.write(string)
80 current_time = time.time()
87 current_time = time.time()
81 if self._start <= 0:
88 if self._start <= 0:
82 self._start = current_time
89 self._start = current_time
83 elif current_time - self._start > self.flush_interval:
90 elif current_time - self._start > self.flush_interval:
84 self.flush()
91 self.flush()
85
92
86 def writelines(self, sequence):
93 def writelines(self, sequence):
87 if self.pub_socket is None:
94 if self.pub_socket is None:
88 raise ValueError('I/O operation on closed file')
95 raise ValueError('I/O operation on closed file')
89 else:
96 else:
90 for string in sequence:
97 for string in sequence:
91 self.write(string)
98 self.write(string)
92
99
93 def _new_buffer(self):
100 def _new_buffer(self):
94 self._buffer = StringIO()
101 self._buffer = StringIO()
95 self._start = -1
102 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now