##// END OF EJS Templates
play nice with py3k...
play nice with py3k python 3 doesn't seem to like the parametric, plus a few missing print parentheses

File last commit:

r9440:0a3cc728
r9442:82de8fc6
Show More
iostream.py
239 lines | 8.1 KiB | text/x-python | PythonLexer
Brian Granger
Separating kernel into smaller pieces.
r2754 import sys
import time
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 import os
MinRK
use zmq to protect subprocess stdout after fork
r9440 import threading
import uuid
Thomas Kluyver
Handle unicode properly in IPython.zmq.iostream
r4070 from io import StringIO
Brian Granger
Separating kernel into smaller pieces.
r2754
MinRK
use zmq to protect subprocess stdout after fork
r9440 import zmq
Brian Granger
Separating kernel into smaller pieces.
r2754 from session import extract_header, Message
Bradley M. Froehle
Use 'UTF-8' as the `encoding` attribute in `OutStream` class....
r8595 from IPython.utils import io, text
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 from IPython.utils import py3compat
Fernando Perez
Add missing flush of output streams on execute
r2938
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 import multiprocessing as mp
MinRK
use zmq to protect subprocess stdout after fork
r9440 # import multiprocessing.sharedctypes as mpshc
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 from ctypes import c_bool
Brian Granger
Separating kernel into smaller pieces.
r2754 #-----------------------------------------------------------------------------
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322 # Globals
Brian Granger
Separating kernel into smaller pieces.
r2754 #-----------------------------------------------------------------------------
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322
MinRK
use zmq to protect subprocess stdout after fork
r9440 MASTER_NO_CHILDREN = 0
MASTER_WITH_CHILDREN = 1
CHILD = 2
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322 #-----------------------------------------------------------------------------
Brian Granger
Separating kernel into smaller pieces.
r2754 # Stream classes
#-----------------------------------------------------------------------------
class OutStream(object):
"""A file like object that publishes the stream to a 0MQ PUB socket."""
# The time interval between automatic flushes, in seconds.
flush_interval = 0.05
MinRK
propagate iopub to clients
r3602 topic=None
Brian Granger
Separating kernel into smaller pieces.
r2754 def __init__(self, session, pub_socket, name):
Bradley M. Froehle
Use 'UTF-8' as the `encoding` attribute in `OutStream` class....
r8595 self.encoding = 'UTF-8'
Brian Granger
Separating kernel into smaller pieces.
r2754 self.session = session
self.pub_socket = pub_socket
self.name = name
self.parent_header = {}
self._new_buffer()
MinRK
use zmq to protect subprocess stdout after fork
r9440 self._found_newprocess = threading.Event()
self._buffer_lock = threading.Lock()
self._master_pid = os.getpid()
self._master_thread = threading.current_thread().ident
self._pipe_pid = os.getpid()
self._setup_pipe_in()
def _setup_pipe_in(self):
"""setup listening pipe for subprocesses"""
ctx = self._pipe_ctx = zmq.Context()
# signal pair for terminating background thread
self._pipe_signaler = ctx.socket(zmq.PAIR)
self._pipe_signalee = ctx.socket(zmq.PAIR)
self._pipe_signaler.bind("inproc://ostream_pipe")
self._pipe_signalee.connect("inproc://ostream_pipe")
# thread event to signal cleanup is done
self._pipe_done = threading.Event()
# use UUID to authenticate pipe messages
self._pipe_uuid = uuid.uuid4().bytes
self._pipe_thread = threading.Thread(target=self._pipe_main)
self._pipe_thread.start()
def _setup_pipe_out(self):
# must be new context after fork
ctx = zmq.Context()
self._pipe_pid = os.getpid()
self._pipe_out = ctx.socket(zmq.PUSH)
self._pipe_out_lock = threading.Lock()
self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
def _pipe_main(self):
"""eventloop for receiving"""
ctx = self._pipe_ctx
self._pipe_in = ctx.socket(zmq.PULL)
self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
poller = zmq.Poller()
poller.register(self._pipe_signalee, zmq.POLLIN)
poller.register(self._pipe_in, zmq.POLLIN)
while True:
if not self._is_master_process():
return
try:
events = dict(poller.poll(1000))
except zmq.ZMQError:
# should only be triggered by process-ending cleanup
return
if self._pipe_signalee in events:
break
if self._pipe_in in events:
msg = self._pipe_in.recv_multipart()
if msg[0] != self._pipe_uuid:
# message not authenticated
continue
self._found_newprocess.set()
text = msg[1].decode(self.encoding, 'replace')
with self._buffer_lock:
self._buffer.write(text)
if self._start < 0:
self._start = time.time()
# wrap it up
self._pipe_signaler.close()
self._pipe_signalee.close()
self._pipe_in.close()
self._pipe_ctx.term()
self._pipe_done.set()
def __del__(self):
if not self._is_master_process():
return
self._pipe_signaler.send(b'die')
self._pipe_done.wait(10)
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 def _is_master_process(self):
MinRK
use zmq to protect subprocess stdout after fork
r9440 return os.getpid() == self._master_pid
def _is_master_thread(self):
return threading.current_thread().ident == self._master_thread
def _have_pipe_out(self):
return os.getpid() == self._pipe_pid
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436
def _check_mp_mode(self):
MinRK
use zmq to protect subprocess stdout after fork
r9440 """check for forks, and switch to zmq pipeline if necessary"""
if self._is_master_process():
if self._found_newprocess.is_set():
return MASTER_WITH_CHILDREN
else:
return MASTER_NO_CHILDREN
else:
if not self._have_pipe_out():
# setup a new out pipe
self._setup_pipe_out()
return CHILD
Brian Granger
Separating kernel into smaller pieces.
r2754
def set_parent(self, parent):
self.parent_header = extract_header(parent)
def close(self):
self.pub_socket = None
def flush(self):
MinRK
use zmq to protect subprocess stdout after fork
r9440 """trigger actual zmq send"""
Brian Granger
Separating kernel into smaller pieces.
r2754 if self.pub_socket is None:
raise ValueError(u'I/O operation on closed file')
else:
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 if self._is_master_process():
MinRK
use zmq to protect subprocess stdout after fork
r9440 if not self._is_master_thread():
# sub-threads mustn't trigger flush,
# but at least they can force the timer.
self._start = 0
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 data = u''
MinRK
use zmq to protect subprocess stdout after fork
r9440 # obtain data
if self._check_mp_mode(): # multiprocess, needs a lock
with self._buffer_lock:
data = self._buffer.getvalue()
self._buffer.close()
self._new_buffer()
else: # single process mode
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 data = self._buffer.getvalue()
MinRK
use zmq to protect subprocess stdout after fork
r9440 self._buffer.close()
self._new_buffer()
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436
if data:
content = {u'name':self.name, u'data':data}
msg = self.session.send(self.pub_socket, u'stream', content=content,
parent=self.parent_header, ident=self.topic)
if hasattr(self.pub_socket, 'flush'):
# socket itself has flush (presumably ZMQStream)
self.pub_socket.flush()
else:
MinRK
use zmq to protect subprocess stdout after fork
r9440 self._check_mp_mode()
with self._pipe_out_lock:
tracker = self._pipe_out.send(b'', copy=False, track=True)
tracker.wait(1)
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436
Brian Granger
Separating kernel into smaller pieces.
r2754
def isatty(self):
return False
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 def __next__(self):
Brian Granger
Separating kernel into smaller pieces.
r2754 raise IOError('Read not supported on a write only stream.')
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 if not py3compat.PY3:
next = __next__
Brian Granger
Separating kernel into smaller pieces.
r2754 def read(self, size=-1):
raise IOError('Read not supported on a write only stream.')
def readline(self, size=-1):
raise IOError('Read not supported on a write only stream.')
def write(self, string):
if self.pub_socket is None:
raise ValueError('I/O operation on closed file')
else:
Thomas Kluyver
Handle unicode properly in IPython.zmq.iostream
r4070 # Make sure that we're handling unicode
if not isinstance(string, unicode):
Bradley M. Froehle
Use 'UTF-8' as the `encoding` attribute in `OutStream` class....
r8595 string = string.decode(self.encoding, 'replace')
MinRK
use zmq to protect subprocess stdout after fork
r9440
mp_mode = self._check_mp_mode()
if mp_mode == CHILD:
with self._pipe_out_lock:
self._pipe_out.send_multipart([
self._pipe_uuid,
string.encode(self.encoding, 'replace'),
])
return
elif mp_mode == MASTER_NO_CHILDREN:
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 self._buffer.write(string)
MinRK
use zmq to protect subprocess stdout after fork
r9440 elif mp_mode == MASTER_WITH_CHILDREN:
with self._buffer_lock:
self._buffer.write(string)
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436
Brian Granger
Separating kernel into smaller pieces.
r2754 current_time = time.time()
MinRK
use zmq to protect subprocess stdout after fork
r9440 if self._start < 0:
Brian Granger
Separating kernel into smaller pieces.
r2754 self._start = current_time
elif current_time - self._start > self.flush_interval:
self.flush()
def writelines(self, sequence):
if self.pub_socket is None:
raise ValueError('I/O operation on closed file')
else:
for string in sequence:
self.write(string)
def _new_buffer(self):
self._buffer = StringIO()
Fernando Perez
Add missing flush of output streams on execute
r2938 self._start = -1