##// END OF EJS Templates
Dual buffer with automatic switch. No performance hit for single process.
Dual buffer with automatic switch. No performance hit for single process.

File last commit:

r9436:9300f847
r9436:9300f847
Show More
iostream.py
157 lines | 5.2 KiB | text/x-python | PythonLexer
Brian Granger
Separating kernel into smaller pieces.
r2754 import sys
import time
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 import os
Thomas Kluyver
Handle unicode properly in IPython.zmq.iostream
r4070 from io import StringIO
Brian Granger
Separating kernel into smaller pieces.
r2754
from session import extract_header, Message
Bradley M. Froehle
Use 'UTF-8' as the `encoding` attribute in `OutStream` class....
r8595 from IPython.utils import io, text
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 from IPython.utils import py3compat
Fernando Perez
Add missing flush of output streams on execute
r2938
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 import multiprocessing as mp
import multiprocessing.sharedctypes as mpshc
from ctypes import c_bool
Brian Granger
Separating kernel into smaller pieces.
r2754 #-----------------------------------------------------------------------------
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322 # Globals
Brian Granger
Separating kernel into smaller pieces.
r2754 #-----------------------------------------------------------------------------
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322
#-----------------------------------------------------------------------------
Brian Granger
Separating kernel into smaller pieces.
r2754 # Stream classes
#-----------------------------------------------------------------------------
class OutStream(object):
"""A file like object that publishes the stream to a 0MQ PUB socket."""
# The time interval between automatic flushes, in seconds.
flush_interval = 0.05
MinRK
propagate iopub to clients
r3602 topic=None
Brian Granger
Separating kernel into smaller pieces.
r2754 def __init__(self, session, pub_socket, name):
Bradley M. Froehle
Use 'UTF-8' as the `encoding` attribute in `OutStream` class....
r8595 self.encoding = 'UTF-8'
Brian Granger
Separating kernel into smaller pieces.
r2754 self.session = session
self.pub_socket = pub_socket
self.name = name
self.parent_header = {}
self._new_buffer()
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 self._manager = mp.Manager()
#use sharectype here so it don't have to hit the manager
#no synchronize needed either(right?). Just a flag telling the master
#to switch the buffer to que
self._found_newprocess = mpshc.RawValue(c_bool, False)
self._que_buffer = self._manager.Queue()
self._que_lock = self._manager.Lock()
self._masterpid = os.getpid()
self._master_has_switched = False
def _switch_to_que(self):
#should only be called on master process
#don't clear the que before putting data in since
#child process might have put something in the que before the
#master know it.
self._que_buffer.put(self._buffer.getvalue())
self._new_buffer()
self._start = -1
def _is_master_process(self):
return os.getpid()==self._masterpid
def _debug_print(self,s):
sys.__stdout__.write(s+'\n')
sys.__stdout__.flush()
def _check_mp_mode(self):
"""check multiprocess and switch to que if necessary"""
if not self._found_newprocess.value:
if not self._is_master_process():
self._found_newprocess.value = True
elif self._found_newprocess.value and not self._master_has_switched:
#switch to que if it has not been switch
if self._is_master_process():
self._switch_to_que()
self._master_has_switched = True
return self._found_newprocess.value
Brian Granger
Separating kernel into smaller pieces.
r2754
def set_parent(self, parent):
self.parent_header = extract_header(parent)
def close(self):
self.pub_socket = None
def flush(self):
Fernando Perez
Add missing flush of output streams on execute
r2938 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436
Brian Granger
Separating kernel into smaller pieces.
r2754 if self.pub_socket is None:
raise ValueError(u'I/O operation on closed file')
else:
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 if self._is_master_process():
data = u''
#obtain data
if self._check_mp_mode():#multiprocess
with self._que_lock:
while not self._que_buffer.empty():
data += self._que_buffer.get()
else:#single process mode
data = self._buffer.getvalue()
if data:
content = {u'name':self.name, u'data':data}
msg = self.session.send(self.pub_socket, u'stream', content=content,
parent=self.parent_header, ident=self.topic)
if hasattr(self.pub_socket, 'flush'):
# socket itself has flush (presumably ZMQStream)
self.pub_socket.flush()
self._buffer.close()
self._new_buffer()
else:
pass
Brian Granger
Separating kernel into smaller pieces.
r2754
def isatty(self):
return False
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 def __next__(self):
Brian Granger
Separating kernel into smaller pieces.
r2754 raise IOError('Read not supported on a write only stream.')
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 if not py3compat.PY3:
next = __next__
Brian Granger
Separating kernel into smaller pieces.
r2754 def read(self, size=-1):
raise IOError('Read not supported on a write only stream.')
def readline(self, size=-1):
raise IOError('Read not supported on a write only stream.')
def write(self, string):
if self.pub_socket is None:
raise ValueError('I/O operation on closed file')
else:
Thomas Kluyver
Handle unicode properly in IPython.zmq.iostream
r4070 # Make sure that we're handling unicode
if not isinstance(string, unicode):
Bradley M. Froehle
Use 'UTF-8' as the `encoding` attribute in `OutStream` class....
r8595 string = string.decode(self.encoding, 'replace')
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 if self._check_mp_mode(): #multi process mode
with self._que_lock:
self._que_buffer.put(string)
else: #sigle process mode
self._buffer.write(string)
Brian Granger
Separating kernel into smaller pieces.
r2754 current_time = time.time()
if self._start <= 0:
self._start = current_time
elif current_time - self._start > self.flush_interval:
self.flush()
def writelines(self, sequence):
if self.pub_socket is None:
raise ValueError('I/O operation on closed file')
else:
for string in sequence:
self.write(string)
def _new_buffer(self):
self._buffer = StringIO()
Fernando Perez
Add missing flush of output streams on execute
r2938 self._start = -1