From 9300f847e13784ea0a35ef3d1e05563fef08adfd 2013-02-04 05:12:03 From: Piti Ongmongkolkul Date: 2013-02-04 05:12:03 Subject: [PATCH] Dual buffer with automatic switch. No performance hit for single process. --- diff --git a/IPython/kernel/zmq/iostream.py b/IPython/kernel/zmq/iostream.py index 83505d6..46dc7f8 100644 --- a/IPython/kernel/zmq/iostream.py +++ b/IPython/kernel/zmq/iostream.py @@ -1,5 +1,6 @@ import sys import time +import os from io import StringIO from session import extract_header, Message @@ -7,6 +8,9 @@ from session import extract_header, Message from IPython.utils import io, text from IPython.utils import py3compat +import multiprocessing as mp +import multiprocessing.sharedctypes as mpshc +from ctypes import c_bool #----------------------------------------------------------------------------- # Globals #----------------------------------------------------------------------------- @@ -29,6 +33,46 @@ class OutStream(object): self.name = name self.parent_header = {} self._new_buffer() + self._manager = mp.Manager() + #use sharectype here so it don't have to hit the manager + #no synchronize needed either(right?). Just a flag telling the master + #to switch the buffer to que + self._found_newprocess = mpshc.RawValue(c_bool, False) + self._que_buffer = self._manager.Queue() + self._que_lock = self._manager.Lock() + self._masterpid = os.getpid() + self._master_has_switched = False + + def _switch_to_que(self): + #should only be called on master process + #don't clear the que before putting data in since + #child process might have put something in the que before the + #master know it. + self._que_buffer.put(self._buffer.getvalue()) + self._new_buffer() + self._start = -1 + + def _is_master_process(self): + return os.getpid()==self._masterpid + + def _debug_print(self,s): + sys.__stdout__.write(s+'\n') + sys.__stdout__.flush() + + def _check_mp_mode(self): + """check multiprocess and switch to que if necessary""" + if not self._found_newprocess.value: + if not self._is_master_process(): + self._found_newprocess.value = True + elif self._found_newprocess.value and not self._master_has_switched: + + #switch to que if it has not been switch + if self._is_master_process(): + self._switch_to_que() + self._master_has_switched = True + + return self._found_newprocess.value + def set_parent(self, parent): self.parent_header = extract_header(parent) @@ -38,20 +82,33 @@ class OutStream(object): def flush(self): #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg + if self.pub_socket is None: raise ValueError(u'I/O operation on closed file') else: - data = self._buffer.getvalue() - if data: - content = {u'name':self.name, u'data':data} - msg = self.session.send(self.pub_socket, u'stream', content=content, - parent=self.parent_header, ident=self.topic) - - if hasattr(self.pub_socket, 'flush'): - # socket itself has flush (presumably ZMQStream) - self.pub_socket.flush() - self._buffer.close() - self._new_buffer() + if self._is_master_process(): + data = u'' + #obtain data + if self._check_mp_mode():#multiprocess + with self._que_lock: + while not self._que_buffer.empty(): + data += self._que_buffer.get() + else:#single process mode + data = self._buffer.getvalue() + + if data: + content = {u'name':self.name, u'data':data} + msg = self.session.send(self.pub_socket, u'stream', content=content, + parent=self.parent_header, ident=self.topic) + + if hasattr(self.pub_socket, 'flush'): + # socket itself has flush (presumably ZMQStream) + self.pub_socket.flush() + self._buffer.close() + self._new_buffer() + else: + pass + def isatty(self): return False @@ -76,7 +133,12 @@ class OutStream(object): if not isinstance(string, unicode): string = string.decode(self.encoding, 'replace') - self._buffer.write(string) + if self._check_mp_mode(): #multi process mode + with self._que_lock: + self._que_buffer.put(string) + else: #sigle process mode + self._buffer.write(string) + current_time = time.time() if self._start <= 0: self._start = current_time