##// END OF EJS Templates
add File/Rename
add File/Rename

File last commit:

r18870:1cf157fa
r19317:69060e0a
Show More
zmqhistory.py
95 lines | 3.5 KiB | text/x-python | PythonLexer
Doug Blank
Addressed issues: renamed ZMQHistoryManager (appears to do need to do more than just an Accessor) in IPython/terminal/console/zmqhistory.py; added abc methods; rewrote access methods to load the history from the kernel client; works with 'ipython console' and with 'ipython console --kernel'
r18846 """ ZMQ Kernel History accessor and manager. """
#-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team.
#
# Distributed under the terms of the BSD License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from IPython.core.history import HistoryAccessorBase
from IPython.utils.traitlets import Dict, List
try:
from queue import Empty # Py 3
except ImportError:
from Queue import Empty # Py 2
class ZMQHistoryManager(HistoryAccessorBase):
"""History accessor and manager for ZMQ-based kernels"""
input_hist_parsed = List([""])
output_hist = Dict()
dir_hist = List()
output_hist_reprs = Dict()
def __init__(self, client):
"""
Class to load the command-line history from a ZMQ-based kernel,
and access the history.
Parameters
----------
client: `IPython.kernel.KernelClient`
The kernel client in order to request the history.
"""
self.client = client
def _load_history(self, raw=True, output=False, hist_access_type='range',
**kwargs):
"""
Load the history over ZMQ from the kernel. Wraps the history
messaging with loop to wait to get history results.
"""
history = []
Doug Blank
ZMQHistoryManager made to work with testing KernelClient which has no history method
r18870 if hasattr(self.client, "history"):
## In tests, KernelClient may not have a history method
msg_id = self.client.history(raw=raw, output=output,
hist_access_type=hist_access_type,
**kwargs)
while True:
try:
reply = self.client.get_shell_msg(timeout=1)
except Empty:
Doug Blank
Addressed issues: renamed ZMQHistoryManager (appears to do need to do more than just an Accessor) in IPython/terminal/console/zmqhistory.py; added abc methods; rewrote access methods to load the history from the kernel client; works with 'ipython console' and with 'ipython console --kernel'
r18846 break
Doug Blank
ZMQHistoryManager made to work with testing KernelClient which has no history method
r18870 else:
if reply['parent_header'].get('msg_id') == msg_id:
history = reply['content'].get('history', [])
break
Doug Blank
Addressed issues: renamed ZMQHistoryManager (appears to do need to do more than just an Accessor) in IPython/terminal/console/zmqhistory.py; added abc methods; rewrote access methods to load the history from the kernel client; works with 'ipython console' and with 'ipython console --kernel'
r18846 return history
def get_tail(self, n=10, raw=True, output=False, include_latest=False):
return self._load_history(hist_access_type='tail', n=n, raw=raw,
output=output)
def search(self, pattern="*", raw=True, search_raw=True,
output=False, n=None, unique=False):
return self._load_history(hist_access_type='search', pattern=pattern,
raw=raw, search_raw=search_raw,
output=output, n=n, unique=unique)
def get_range(self, session, start=1, stop=None, raw=True,output=False):
return self._load_history(hist_access_type='range', raw=raw,
output=output, start=start, stop=stop,
session=session)
def get_range_by_str(self, rangestr, raw=True, output=False):
return self._load_history(hist_access_type='range', raw=raw,
Doug Blank
Addressed issues: replaced abc with NotImplementedError; removed unneeded methods; fixed typo
r18848 output=output, rangestr=rangestr)
Doug Blank
Addressed issues: renamed ZMQHistoryManager (appears to do need to do more than just an Accessor) in IPython/terminal/console/zmqhistory.py; added abc methods; rewrote access methods to load the history from the kernel client; works with 'ipython console' and with 'ipython console --kernel'
r18846
def end_session(self):
"""
Nothing to do for ZMQ-based histories.
"""
pass
def reset(self, new_session=True):
Doug Blank
Addressed issues: replaced abc with NotImplementedError; removed unneeded methods; fixed typo
r18848 """
Nothing to do for ZMQ-based histories.
"""
pass
Doug Blank
Addressed issues: renamed ZMQHistoryManager (appears to do need to do more than just an Accessor) in IPython/terminal/console/zmqhistory.py; added abc methods; rewrote access methods to load the history from the kernel client; works with 'ipython console' and with 'ipython console --kernel'
r18846