##// END OF EJS Templates
Dual buffer with automatic switch. No performance hit for single process.
Piti Ongmongkolkul -
Show More
@@ -1,95 +1,157 b''
1 import sys
1 import sys
2 import time
2 import time
3 import os
3 from io import StringIO
4 from io import StringIO
4
5
5 from session import extract_header, Message
6 from session import extract_header, Message
6
7
7 from IPython.utils import io, text
8 from IPython.utils import io, text
8 from IPython.utils import py3compat
9 from IPython.utils import py3compat
9
10
11 import multiprocessing as mp
12 import multiprocessing.sharedctypes as mpshc
13 from ctypes import c_bool
10 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
11 # Globals
15 # Globals
12 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
13
17
14 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
15 # Stream classes
19 # Stream classes
16 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
17
21
18 class OutStream(object):
22 class OutStream(object):
19 """A file like object that publishes the stream to a 0MQ PUB socket."""
23 """A file like object that publishes the stream to a 0MQ PUB socket."""
20
24
21 # The time interval between automatic flushes, in seconds.
25 # The time interval between automatic flushes, in seconds.
22 flush_interval = 0.05
26 flush_interval = 0.05
23 topic=None
27 topic=None
24
28
25 def __init__(self, session, pub_socket, name):
29 def __init__(self, session, pub_socket, name):
26 self.encoding = 'UTF-8'
30 self.encoding = 'UTF-8'
27 self.session = session
31 self.session = session
28 self.pub_socket = pub_socket
32 self.pub_socket = pub_socket
29 self.name = name
33 self.name = name
30 self.parent_header = {}
34 self.parent_header = {}
31 self._new_buffer()
35 self._new_buffer()
36 self._manager = mp.Manager()
37 #use sharectype here so it don't have to hit the manager
38 #no synchronize needed either(right?). Just a flag telling the master
39 #to switch the buffer to que
40 self._found_newprocess = mpshc.RawValue(c_bool, False)
41 self._que_buffer = self._manager.Queue()
42 self._que_lock = self._manager.Lock()
43 self._masterpid = os.getpid()
44 self._master_has_switched = False
45
46 def _switch_to_que(self):
47 #should only be called on master process
48 #don't clear the que before putting data in since
49 #child process might have put something in the que before the
50 #master know it.
51 self._que_buffer.put(self._buffer.getvalue())
52 self._new_buffer()
53 self._start = -1
54
55 def _is_master_process(self):
56 return os.getpid()==self._masterpid
57
58 def _debug_print(self,s):
59 sys.__stdout__.write(s+'\n')
60 sys.__stdout__.flush()
61
62 def _check_mp_mode(self):
63 """check multiprocess and switch to que if necessary"""
64 if not self._found_newprocess.value:
65 if not self._is_master_process():
66 self._found_newprocess.value = True
67 elif self._found_newprocess.value and not self._master_has_switched:
68
69 #switch to que if it has not been switch
70 if self._is_master_process():
71 self._switch_to_que()
72 self._master_has_switched = True
73
74 return self._found_newprocess.value
75
32
76
33 def set_parent(self, parent):
77 def set_parent(self, parent):
34 self.parent_header = extract_header(parent)
78 self.parent_header = extract_header(parent)
35
79
36 def close(self):
80 def close(self):
37 self.pub_socket = None
81 self.pub_socket = None
38
82
39 def flush(self):
83 def flush(self):
40 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
84 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
85
41 if self.pub_socket is None:
86 if self.pub_socket is None:
42 raise ValueError(u'I/O operation on closed file')
87 raise ValueError(u'I/O operation on closed file')
43 else:
88 else:
44 data = self._buffer.getvalue()
89 if self._is_master_process():
45 if data:
90 data = u''
46 content = {u'name':self.name, u'data':data}
91 #obtain data
47 msg = self.session.send(self.pub_socket, u'stream', content=content,
92 if self._check_mp_mode():#multiprocess
48 parent=self.parent_header, ident=self.topic)
93 with self._que_lock:
49
94 while not self._que_buffer.empty():
50 if hasattr(self.pub_socket, 'flush'):
95 data += self._que_buffer.get()
51 # socket itself has flush (presumably ZMQStream)
96 else:#single process mode
52 self.pub_socket.flush()
97 data = self._buffer.getvalue()
53 self._buffer.close()
98
54 self._new_buffer()
99 if data:
100 content = {u'name':self.name, u'data':data}
101 msg = self.session.send(self.pub_socket, u'stream', content=content,
102 parent=self.parent_header, ident=self.topic)
103
104 if hasattr(self.pub_socket, 'flush'):
105 # socket itself has flush (presumably ZMQStream)
106 self.pub_socket.flush()
107 self._buffer.close()
108 self._new_buffer()
109 else:
110 pass
111
55
112
56 def isatty(self):
113 def isatty(self):
57 return False
114 return False
58
115
59 def __next__(self):
116 def __next__(self):
60 raise IOError('Read not supported on a write only stream.')
117 raise IOError('Read not supported on a write only stream.')
61
118
62 if not py3compat.PY3:
119 if not py3compat.PY3:
63 next = __next__
120 next = __next__
64
121
65 def read(self, size=-1):
122 def read(self, size=-1):
66 raise IOError('Read not supported on a write only stream.')
123 raise IOError('Read not supported on a write only stream.')
67
124
68 def readline(self, size=-1):
125 def readline(self, size=-1):
69 raise IOError('Read not supported on a write only stream.')
126 raise IOError('Read not supported on a write only stream.')
70
127
71 def write(self, string):
128 def write(self, string):
72 if self.pub_socket is None:
129 if self.pub_socket is None:
73 raise ValueError('I/O operation on closed file')
130 raise ValueError('I/O operation on closed file')
74 else:
131 else:
75 # Make sure that we're handling unicode
132 # Make sure that we're handling unicode
76 if not isinstance(string, unicode):
133 if not isinstance(string, unicode):
77 string = string.decode(self.encoding, 'replace')
134 string = string.decode(self.encoding, 'replace')
78
135
79 self._buffer.write(string)
136 if self._check_mp_mode(): #multi process mode
137 with self._que_lock:
138 self._que_buffer.put(string)
139 else: #sigle process mode
140 self._buffer.write(string)
141
80 current_time = time.time()
142 current_time = time.time()
81 if self._start <= 0:
143 if self._start <= 0:
82 self._start = current_time
144 self._start = current_time
83 elif current_time - self._start > self.flush_interval:
145 elif current_time - self._start > self.flush_interval:
84 self.flush()
146 self.flush()
85
147
86 def writelines(self, sequence):
148 def writelines(self, sequence):
87 if self.pub_socket is None:
149 if self.pub_socket is None:
88 raise ValueError('I/O operation on closed file')
150 raise ValueError('I/O operation on closed file')
89 else:
151 else:
90 for string in sequence:
152 for string in sequence:
91 self.write(string)
153 self.write(string)
92
154
93 def _new_buffer(self):
155 def _new_buffer(self):
94 self._buffer = StringIO()
156 self._buffer = StringIO()
95 self._start = -1
157 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now