##// END OF EJS Templates
ZMQHistoryManager made to work with testing KernelClient which has no history method
ZMQHistoryManager made to work with testing KernelClient which has no history method

File last commit:

r18870:1cf157fa
r18870:1cf157fa
Show More
zmqhistory.py
95 lines | 3.5 KiB | text/x-python | PythonLexer
Doug Blank
Addressed issues: renamed ZMQHistoryManager (appears to do need to do more than just an Accessor) in IPython/terminal/console/zmqhistory.py; added abc methods; rewrote access methods to load the history from the kernel client; works with 'ipython console' and with 'ipython console --kernel'
r18846 """ ZMQ Kernel History accessor and manager. """
#-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team.
#
# Distributed under the terms of the BSD License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from IPython.core.history import HistoryAccessorBase
from IPython.utils.traitlets import Dict, List
try:
from queue import Empty # Py 3
except ImportError:
from Queue import Empty # Py 2
class ZMQHistoryManager(HistoryAccessorBase):
"""History accessor and manager for ZMQ-based kernels"""
input_hist_parsed = List([""])
output_hist = Dict()
dir_hist = List()
output_hist_reprs = Dict()
def __init__(self, client):
"""
Class to load the command-line history from a ZMQ-based kernel,
and access the history.
Parameters
----------
client: `IPython.kernel.KernelClient`
The kernel client in order to request the history.
"""
self.client = client
def _load_history(self, raw=True, output=False, hist_access_type='range',
**kwargs):
"""
Load the history over ZMQ from the kernel. Wraps the history
messaging with loop to wait to get history results.
"""
history = []
Doug Blank
ZMQHistoryManager made to work with testing KernelClient which has no history method
r18870 if hasattr(self.client, "history"):
## In tests, KernelClient may not have a history method
msg_id = self.client.history(raw=raw, output=output,
hist_access_type=hist_access_type,
**kwargs)
while True:
try:
reply = self.client.get_shell_msg(timeout=1)
except Empty:
Doug Blank
Addressed issues: renamed ZMQHistoryManager (appears to do need to do more than just an Accessor) in IPython/terminal/console/zmqhistory.py; added abc methods; rewrote access methods to load the history from the kernel client; works with 'ipython console' and with 'ipython console --kernel'
r18846 break
Doug Blank
ZMQHistoryManager made to work with testing KernelClient which has no history method
r18870 else:
if reply['parent_header'].get('msg_id') == msg_id:
history = reply['content'].get('history', [])
break
Doug Blank
Addressed issues: renamed ZMQHistoryManager (appears to do need to do more than just an Accessor) in IPython/terminal/console/zmqhistory.py; added abc methods; rewrote access methods to load the history from the kernel client; works with 'ipython console' and with 'ipython console --kernel'
r18846 return history
def get_tail(self, n=10, raw=True, output=False, include_latest=False):
return self._load_history(hist_access_type='tail', n=n, raw=raw,
output=output)
def search(self, pattern="*", raw=True, search_raw=True,
output=False, n=None, unique=False):
return self._load_history(hist_access_type='search', pattern=pattern,
raw=raw, search_raw=search_raw,
output=output, n=n, unique=unique)
def get_range(self, session, start=1, stop=None, raw=True,output=False):
return self._load_history(hist_access_type='range', raw=raw,
output=output, start=start, stop=stop,
session=session)
def get_range_by_str(self, rangestr, raw=True, output=False):
return self._load_history(hist_access_type='range', raw=raw,
Doug Blank
Addressed issues: replaced abc with NotImplementedError; removed unneeded methods; fixed typo
r18848 output=output, rangestr=rangestr)
Doug Blank
Addressed issues: renamed ZMQHistoryManager (appears to do need to do more than just an Accessor) in IPython/terminal/console/zmqhistory.py; added abc methods; rewrote access methods to load the history from the kernel client; works with 'ipython console' and with 'ipython console --kernel'
r18846
def end_session(self):
"""
Nothing to do for ZMQ-based histories.
"""
pass
def reset(self, new_session=True):
Doug Blank
Addressed issues: replaced abc with NotImplementedError; removed unneeded methods; fixed typo
r18848 """
Nothing to do for ZMQ-based histories.
"""
pass
Doug Blank
Addressed issues: renamed ZMQHistoryManager (appears to do need to do more than just an Accessor) in IPython/terminal/console/zmqhistory.py; added abc methods; rewrote access methods to load the history from the kernel client; works with 'ipython console' and with 'ipython console --kernel'
r18846