|
|
import re
|
|
|
import tokenize
|
|
|
from io import StringIO
|
|
|
from typing import Callable, List, Optional, Union, Generator, Tuple, Sequence
|
|
|
|
|
|
from prompt_toolkit.buffer import Buffer
|
|
|
from prompt_toolkit.key_binding import KeyPressEvent
|
|
|
from prompt_toolkit.key_binding.bindings import named_commands as nc
|
|
|
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion
|
|
|
from prompt_toolkit.document import Document
|
|
|
from prompt_toolkit.history import History
|
|
|
from prompt_toolkit.shortcuts import PromptSession
|
|
|
from prompt_toolkit.layout.processors import (
|
|
|
Processor,
|
|
|
Transformation,
|
|
|
TransformationInput,
|
|
|
)
|
|
|
|
|
|
from IPython.utils.tokenutil import generate_tokens
|
|
|
|
|
|
|
|
|
def _get_query(document: Document):
|
|
|
return document.lines[document.cursor_position_row]
|
|
|
|
|
|
|
|
|
class AppendAutoSuggestionInAnyLine(Processor):
|
|
|
"""
|
|
|
Append the auto suggestion to lines other than the last (appending to the
|
|
|
last line is natively supported by the prompt toolkit).
|
|
|
"""
|
|
|
|
|
|
def __init__(self, style: str = "class:auto-suggestion") -> None:
|
|
|
self.style = style
|
|
|
|
|
|
def apply_transformation(self, ti: TransformationInput) -> Transformation:
|
|
|
is_last_line = ti.lineno == ti.document.line_count - 1
|
|
|
is_active_line = ti.lineno == ti.document.cursor_position_row
|
|
|
|
|
|
if not is_last_line and is_active_line:
|
|
|
buffer = ti.buffer_control.buffer
|
|
|
|
|
|
if buffer.suggestion and ti.document.is_cursor_at_the_end_of_line:
|
|
|
suggestion = buffer.suggestion.text
|
|
|
else:
|
|
|
suggestion = ""
|
|
|
|
|
|
return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
|
|
|
else:
|
|
|
return Transformation(fragments=ti.fragments)
|
|
|
|
|
|
|
|
|
class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory):
|
|
|
"""
|
|
|
A subclass of AutoSuggestFromHistory that allow navigation to next/previous
|
|
|
suggestion from history. To do so it remembers the current position, but it
|
|
|
state need to carefully be cleared on the right events.
|
|
|
"""
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
):
|
|
|
self.skip_lines = 0
|
|
|
self._connected_apps = []
|
|
|
|
|
|
def reset_history_position(self, _: Buffer):
|
|
|
self.skip_lines = 0
|
|
|
|
|
|
def disconnect(self):
|
|
|
for pt_app in self._connected_apps:
|
|
|
text_insert_event = pt_app.default_buffer.on_text_insert
|
|
|
text_insert_event.remove_handler(self.reset_history_position)
|
|
|
|
|
|
def connect(self, pt_app: PromptSession):
|
|
|
self._connected_apps.append(pt_app)
|
|
|
# note: `on_text_changed` could be used for a bit different behaviour
|
|
|
# on character deletion (i.e. reseting history position on backspace)
|
|
|
pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position)
|
|
|
|
|
|
def get_suggestion(
|
|
|
self, buffer: Buffer, document: Document
|
|
|
) -> Optional[Suggestion]:
|
|
|
text = _get_query(document)
|
|
|
|
|
|
if text.strip():
|
|
|
for suggestion, _ in self._find_next_match(
|
|
|
text, self.skip_lines, buffer.history
|
|
|
):
|
|
|
return Suggestion(suggestion)
|
|
|
|
|
|
return None
|
|
|
|
|
|
def _find_match(
|
|
|
self, text: str, skip_lines: float, history: History, previous: bool
|
|
|
) -> Generator[Tuple[str, float], None, None]:
|
|
|
"""
|
|
|
text : str
|
|
|
Text content to find a match for, the user cursor is most of the
|
|
|
time at the end of this text.
|
|
|
skip_lines : float
|
|
|
number of items to skip in the search, this is used to indicate how
|
|
|
far in the list the user has navigated by pressing up or down.
|
|
|
The float type is used as the base value is +inf
|
|
|
history : History
|
|
|
prompt_toolkit History instance to fetch previous entries from.
|
|
|
previous : bool
|
|
|
Direction of the search, whether we are looking previous match
|
|
|
(True), or next match (False).
|
|
|
|
|
|
Yields
|
|
|
------
|
|
|
Tuple with:
|
|
|
str:
|
|
|
current suggestion.
|
|
|
float:
|
|
|
will actually yield only ints, which is passed back via skip_lines,
|
|
|
which may be a +inf (float)
|
|
|
|
|
|
|
|
|
"""
|
|
|
line_number = -1
|
|
|
for string in reversed(list(history.get_strings())):
|
|
|
for line in reversed(string.splitlines()):
|
|
|
line_number += 1
|
|
|
if not previous and line_number < skip_lines:
|
|
|
continue
|
|
|
# do not return empty suggestions as these
|
|
|
# close the auto-suggestion overlay (and are useless)
|
|
|
if line.startswith(text) and len(line) > len(text):
|
|
|
yield line[len(text) :], line_number
|
|
|
if previous and line_number >= skip_lines:
|
|
|
return
|
|
|
|
|
|
def _find_next_match(
|
|
|
self, text: str, skip_lines: float, history: History
|
|
|
) -> Generator[Tuple[str, float], None, None]:
|
|
|
return self._find_match(text, skip_lines, history, previous=False)
|
|
|
|
|
|
def _find_previous_match(self, text: str, skip_lines: float, history: History):
|
|
|
return reversed(
|
|
|
list(self._find_match(text, skip_lines, history, previous=True))
|
|
|
)
|
|
|
|
|
|
def up(self, query: str, other_than: str, history: History) -> None:
|
|
|
for suggestion, line_number in self._find_next_match(
|
|
|
query, self.skip_lines, history
|
|
|
):
|
|
|
# if user has history ['very.a', 'very', 'very.b'] and typed 'very'
|
|
|
# we want to switch from 'very.b' to 'very.a' because a) if the
|
|
|
# suggestion equals current text, prompt-toolkit aborts suggesting
|
|
|
# b) user likely would not be interested in 'very' anyways (they
|
|
|
# already typed it).
|
|
|
if query + suggestion != other_than:
|
|
|
self.skip_lines = line_number
|
|
|
break
|
|
|
else:
|
|
|
# no matches found, cycle back to beginning
|
|
|
self.skip_lines = 0
|
|
|
|
|
|
def down(self, query: str, other_than: str, history: History) -> None:
|
|
|
for suggestion, line_number in self._find_previous_match(
|
|
|
query, self.skip_lines, history
|
|
|
):
|
|
|
if query + suggestion != other_than:
|
|
|
self.skip_lines = line_number
|
|
|
break
|
|
|
else:
|
|
|
# no matches found, cycle to end
|
|
|
for suggestion, line_number in self._find_previous_match(
|
|
|
query, float("Inf"), history
|
|
|
):
|
|
|
if query + suggestion != other_than:
|
|
|
self.skip_lines = line_number
|
|
|
break
|
|
|
|
|
|
|
|
|
# Needed for to accept autosuggestions in vi insert mode
|
|
|
def accept_in_vi_insert_mode(event: KeyPressEvent):
|
|
|
"""Apply autosuggestion if at end of line."""
|
|
|
buffer = event.current_buffer
|
|
|
d = buffer.document
|
|
|
after_cursor = d.text[d.cursor_position :]
|
|
|
lines = after_cursor.split("\n")
|
|
|
end_of_current_line = lines[0].strip()
|
|
|
suggestion = buffer.suggestion
|
|
|
if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""):
|
|
|
buffer.insert_text(suggestion.text)
|
|
|
else:
|
|
|
nc.end_of_line(event)
|
|
|
|
|
|
|
|
|
def accept(event: KeyPressEvent):
|
|
|
"""Accept autosuggestion"""
|
|
|
buffer = event.current_buffer
|
|
|
suggestion = buffer.suggestion
|
|
|
if suggestion:
|
|
|
buffer.insert_text(suggestion.text)
|
|
|
else:
|
|
|
nc.forward_char(event)
|
|
|
|
|
|
|
|
|
def discard(event: KeyPressEvent):
|
|
|
"""Discard autosuggestion"""
|
|
|
buffer = event.current_buffer
|
|
|
buffer.suggestion = None
|
|
|
|
|
|
|
|
|
def accept_word(event: KeyPressEvent):
|
|
|
"""Fill partial autosuggestion by word"""
|
|
|
buffer = event.current_buffer
|
|
|
suggestion = buffer.suggestion
|
|
|
if suggestion:
|
|
|
t = re.split(r"(\S+\s+)", suggestion.text)
|
|
|
buffer.insert_text(next((x for x in t if x), ""))
|
|
|
else:
|
|
|
nc.forward_word(event)
|
|
|
|
|
|
|
|
|
def accept_character(event: KeyPressEvent):
|
|
|
"""Fill partial autosuggestion by character"""
|
|
|
b = event.current_buffer
|
|
|
suggestion = b.suggestion
|
|
|
if suggestion and suggestion.text:
|
|
|
b.insert_text(suggestion.text[0])
|
|
|
|
|
|
|
|
|
def accept_and_keep_cursor(event: KeyPressEvent):
|
|
|
"""Accept autosuggestion and keep cursor in place"""
|
|
|
buffer = event.current_buffer
|
|
|
old_position = buffer.cursor_position
|
|
|
suggestion = buffer.suggestion
|
|
|
if suggestion:
|
|
|
buffer.insert_text(suggestion.text)
|
|
|
buffer.cursor_position = old_position
|
|
|
|
|
|
|
|
|
def accept_and_move_cursor_left(event: KeyPressEvent):
|
|
|
"""Accept autosuggestion and move cursor left in place"""
|
|
|
accept_and_keep_cursor(event)
|
|
|
nc.backward_char(event)
|
|
|
|
|
|
|
|
|
def _update_hint(buffer: Buffer):
|
|
|
if buffer.auto_suggest:
|
|
|
suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document)
|
|
|
buffer.suggestion = suggestion
|
|
|
|
|
|
|
|
|
def backspace_and_resume_hint(event: KeyPressEvent):
|
|
|
"""Resume autosuggestions after deleting last character"""
|
|
|
current_buffer = event.current_buffer
|
|
|
|
|
|
def resume_hinting(buffer: Buffer):
|
|
|
_update_hint(buffer)
|
|
|
current_buffer.on_text_changed.remove_handler(resume_hinting)
|
|
|
|
|
|
current_buffer.on_text_changed.add_handler(resume_hinting)
|
|
|
nc.backward_delete_char(event)
|
|
|
|
|
|
|
|
|
def up_and_update_hint(event: KeyPressEvent):
|
|
|
"""Go up and update hint"""
|
|
|
current_buffer = event.current_buffer
|
|
|
|
|
|
current_buffer.auto_up(count=event.arg)
|
|
|
_update_hint(current_buffer)
|
|
|
|
|
|
|
|
|
def down_and_update_hint(event: KeyPressEvent):
|
|
|
"""Go down and update hint"""
|
|
|
current_buffer = event.current_buffer
|
|
|
|
|
|
current_buffer.auto_down(count=event.arg)
|
|
|
_update_hint(current_buffer)
|
|
|
|
|
|
|
|
|
def accept_token(event: KeyPressEvent):
|
|
|
"""Fill partial autosuggestion by token"""
|
|
|
b = event.current_buffer
|
|
|
suggestion = b.suggestion
|
|
|
|
|
|
if suggestion:
|
|
|
prefix = _get_query(b.document)
|
|
|
text = prefix + suggestion.text
|
|
|
|
|
|
tokens: List[Optional[str]] = [None, None, None]
|
|
|
substrings = [""]
|
|
|
i = 0
|
|
|
|
|
|
for token in generate_tokens(StringIO(text).readline):
|
|
|
if token.type == tokenize.NEWLINE:
|
|
|
index = len(text)
|
|
|
else:
|
|
|
index = text.index(token[1], len(substrings[-1]))
|
|
|
substrings.append(text[:index])
|
|
|
tokenized_so_far = substrings[-1]
|
|
|
if tokenized_so_far.startswith(prefix):
|
|
|
if i == 0 and len(tokenized_so_far) > len(prefix):
|
|
|
tokens[0] = tokenized_so_far[len(prefix) :]
|
|
|
substrings.append(tokenized_so_far)
|
|
|
i += 1
|
|
|
tokens[i] = token[1]
|
|
|
if i == 2:
|
|
|
break
|
|
|
i += 1
|
|
|
|
|
|
if tokens[0]:
|
|
|
to_insert: str
|
|
|
insert_text = substrings[-2]
|
|
|
if tokens[1] and len(tokens[1]) == 1:
|
|
|
insert_text = substrings[-1]
|
|
|
to_insert = insert_text[len(prefix) :]
|
|
|
b.insert_text(to_insert)
|
|
|
return
|
|
|
|
|
|
nc.forward_word(event)
|
|
|
|
|
|
|
|
|
Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]
|
|
|
|
|
|
|
|
|
def _swap_autosuggestion(
|
|
|
buffer: Buffer,
|
|
|
provider: NavigableAutoSuggestFromHistory,
|
|
|
direction_method: Callable,
|
|
|
):
|
|
|
"""
|
|
|
We skip most recent history entry (in either direction) if it equals the
|
|
|
current autosuggestion because if user cycles when auto-suggestion is shown
|
|
|
they most likely want something else than what was suggested (otherwise
|
|
|
they would have accepted the suggestion).
|
|
|
"""
|
|
|
suggestion = buffer.suggestion
|
|
|
if not suggestion:
|
|
|
return
|
|
|
|
|
|
query = _get_query(buffer.document)
|
|
|
current = query + suggestion.text
|
|
|
|
|
|
direction_method(query=query, other_than=current, history=buffer.history)
|
|
|
|
|
|
new_suggestion = provider.get_suggestion(buffer, buffer.document)
|
|
|
buffer.suggestion = new_suggestion
|
|
|
|
|
|
|
|
|
def swap_autosuggestion_up(provider: Provider):
|
|
|
def swap_autosuggestion_up(event: KeyPressEvent):
|
|
|
"""Get next autosuggestion from history."""
|
|
|
if not isinstance(provider, NavigableAutoSuggestFromHistory):
|
|
|
return
|
|
|
|
|
|
return _swap_autosuggestion(
|
|
|
buffer=event.current_buffer, provider=provider, direction_method=provider.up
|
|
|
)
|
|
|
|
|
|
swap_autosuggestion_up.__name__ = "swap_autosuggestion_up"
|
|
|
return swap_autosuggestion_up
|
|
|
|
|
|
|
|
|
def swap_autosuggestion_down(
|
|
|
provider: Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]
|
|
|
):
|
|
|
def swap_autosuggestion_down(event: KeyPressEvent):
|
|
|
"""Get previous autosuggestion from history."""
|
|
|
if not isinstance(provider, NavigableAutoSuggestFromHistory):
|
|
|
return
|
|
|
|
|
|
return _swap_autosuggestion(
|
|
|
buffer=event.current_buffer,
|
|
|
provider=provider,
|
|
|
direction_method=provider.down,
|
|
|
)
|
|
|
|
|
|
swap_autosuggestion_down.__name__ = "swap_autosuggestion_down"
|
|
|
return swap_autosuggestion_down
|
|
|
|