import sys import time import os from io import StringIO from session import extract_header, Message from IPython.utils import io, text from IPython.utils import py3compat import multiprocessing as mp import multiprocessing.sharedctypes as mpshc from ctypes import c_bool #----------------------------------------------------------------------------- # Globals #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Stream classes #----------------------------------------------------------------------------- class OutStream(object): """A file like object that publishes the stream to a 0MQ PUB socket.""" # The time interval between automatic flushes, in seconds. flush_interval = 0.05 topic=None def __init__(self, session, pub_socket, name): self.encoding = 'UTF-8' self.session = session self.pub_socket = pub_socket self.name = name self.parent_header = {} self._new_buffer() self._manager = mp.Manager() #use sharectype here so it don't have to hit the manager #no synchronize needed either(right?). Just a flag telling the master #to switch the buffer to que self._found_newprocess = mpshc.RawValue(c_bool, False) self._que_buffer = self._manager.Queue() self._que_lock = self._manager.Lock() self._masterpid = os.getpid() self._master_has_switched = False def _switch_to_que(self): #should only be called on master process #don't clear the que before putting data in since #child process might have put something in the que before the #master know it. self._que_buffer.put(self._buffer.getvalue()) self._new_buffer() self._start = -1 def _is_master_process(self): return os.getpid()==self._masterpid def _debug_print(self,s): sys.__stdout__.write(s+'\n') sys.__stdout__.flush() def _check_mp_mode(self): """check multiprocess and switch to que if necessary""" if not self._found_newprocess.value: if not self._is_master_process(): self._found_newprocess.value = True elif self._found_newprocess.value and not self._master_has_switched: #switch to que if it has not been switch if self._is_master_process(): self._switch_to_que() self._master_has_switched = True return self._found_newprocess.value def set_parent(self, parent): self.parent_header = extract_header(parent) def close(self): self.pub_socket = None def flush(self): #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg if self.pub_socket is None: raise ValueError(u'I/O operation on closed file') else: if self._is_master_process(): data = u'' #obtain data if self._check_mp_mode():#multiprocess with self._que_lock: while not self._que_buffer.empty(): data += self._que_buffer.get() else:#single process mode data = self._buffer.getvalue() if data: content = {u'name':self.name, u'data':data} msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header, ident=self.topic) if hasattr(self.pub_socket, 'flush'): # socket itself has flush (presumably ZMQStream) self.pub_socket.flush() self._buffer.close() self._new_buffer() else: pass def isatty(self): return False def __next__(self): raise IOError('Read not supported on a write only stream.') if not py3compat.PY3: next = __next__ def read(self, size=-1): raise IOError('Read not supported on a write only stream.') def readline(self, size=-1): raise IOError('Read not supported on a write only stream.') def write(self, string): if self.pub_socket is None: raise ValueError('I/O operation on closed file') else: # Make sure that we're handling unicode if not isinstance(string, unicode): string = string.decode(self.encoding, 'replace') if self._check_mp_mode(): #multi process mode with self._que_lock: self._que_buffer.put(string) else: #sigle process mode self._buffer.write(string) current_time = time.time() if self._start <= 0: self._start = current_time elif current_time - self._start > self.flush_interval: self.flush() def writelines(self, sequence): if self.pub_socket is None: raise ValueError('I/O operation on closed file') else: for string in sequence: self.write(string) def _new_buffer(self): self._buffer = StringIO() self._start = -1