##// END OF EJS Templates
Dual buffer with automatic switch. No performance hit for single process.
Piti Ongmongkolkul -
Show More
@@ -1,5 +1,6 b''
1 1 import sys
2 2 import time
3 import os
3 4 from io import StringIO
4 5
5 6 from session import extract_header, Message
@@ -7,6 +8,9 b' from session import extract_header, Message'
7 8 from IPython.utils import io, text
8 9 from IPython.utils import py3compat
9 10
11 import multiprocessing as mp
12 import multiprocessing.sharedctypes as mpshc
13 from ctypes import c_bool
10 14 #-----------------------------------------------------------------------------
11 15 # Globals
12 16 #-----------------------------------------------------------------------------
@@ -29,6 +33,46 b' class OutStream(object):'
29 33 self.name = name
30 34 self.parent_header = {}
31 35 self._new_buffer()
36 self._manager = mp.Manager()
37 #use sharectype here so it don't have to hit the manager
38 #no synchronize needed either(right?). Just a flag telling the master
39 #to switch the buffer to que
40 self._found_newprocess = mpshc.RawValue(c_bool, False)
41 self._que_buffer = self._manager.Queue()
42 self._que_lock = self._manager.Lock()
43 self._masterpid = os.getpid()
44 self._master_has_switched = False
45
46 def _switch_to_que(self):
47 #should only be called on master process
48 #don't clear the que before putting data in since
49 #child process might have put something in the que before the
50 #master know it.
51 self._que_buffer.put(self._buffer.getvalue())
52 self._new_buffer()
53 self._start = -1
54
55 def _is_master_process(self):
56 return os.getpid()==self._masterpid
57
58 def _debug_print(self,s):
59 sys.__stdout__.write(s+'\n')
60 sys.__stdout__.flush()
61
62 def _check_mp_mode(self):
63 """check multiprocess and switch to que if necessary"""
64 if not self._found_newprocess.value:
65 if not self._is_master_process():
66 self._found_newprocess.value = True
67 elif self._found_newprocess.value and not self._master_has_switched:
68
69 #switch to que if it has not been switch
70 if self._is_master_process():
71 self._switch_to_que()
72 self._master_has_switched = True
73
74 return self._found_newprocess.value
75
32 76
33 77 def set_parent(self, parent):
34 78 self.parent_header = extract_header(parent)
@@ -38,10 +82,20 b' class OutStream(object):'
38 82
39 83 def flush(self):
40 84 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
85
41 86 if self.pub_socket is None:
42 87 raise ValueError(u'I/O operation on closed file')
43 88 else:
89 if self._is_master_process():
90 data = u''
91 #obtain data
92 if self._check_mp_mode():#multiprocess
93 with self._que_lock:
94 while not self._que_buffer.empty():
95 data += self._que_buffer.get()
96 else:#single process mode
44 97 data = self._buffer.getvalue()
98
45 99 if data:
46 100 content = {u'name':self.name, u'data':data}
47 101 msg = self.session.send(self.pub_socket, u'stream', content=content,
@@ -52,6 +106,9 b' class OutStream(object):'
52 106 self.pub_socket.flush()
53 107 self._buffer.close()
54 108 self._new_buffer()
109 else:
110 pass
111
55 112
56 113 def isatty(self):
57 114 return False
@@ -76,7 +133,12 b' class OutStream(object):'
76 133 if not isinstance(string, unicode):
77 134 string = string.decode(self.encoding, 'replace')
78 135
136 if self._check_mp_mode(): #multi process mode
137 with self._que_lock:
138 self._que_buffer.put(string)
139 else: #sigle process mode
79 140 self._buffer.write(string)
141
80 142 current_time = time.time()
81 143 if self._start <= 0:
82 144 self._start = current_time
General Comments 0
You need to be logged in to leave comments. Login now