auto_suggest.py
378 lines
| 12.7 KiB
| text/x-python
|
PythonLexer
krassowski
|
r28014 | import re | ||
import tokenize | ||||
from io import StringIO | ||||
Matthias Bussonnier
|
r28031 | from typing import Callable, List, Optional, Union, Generator, Tuple, Sequence | ||
krassowski
|
r28014 | |||
from prompt_toolkit.buffer import Buffer | ||||
from prompt_toolkit.key_binding import KeyPressEvent | ||||
from prompt_toolkit.key_binding.bindings import named_commands as nc | ||||
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion | ||||
from prompt_toolkit.document import Document | ||||
from prompt_toolkit.history import History | ||||
from prompt_toolkit.shortcuts import PromptSession | ||||
krassowski
|
r28041 | from prompt_toolkit.layout.processors import ( | ||
Processor, | ||||
Transformation, | ||||
TransformationInput, | ||||
) | ||||
krassowski
|
r28014 | |||
from IPython.utils.tokenutil import generate_tokens | ||||
def _get_query(document: Document): | ||||
krassowski
|
r28041 | return document.lines[document.cursor_position_row] | ||
class AppendAutoSuggestionInAnyLine(Processor): | ||||
""" | ||||
Append the auto suggestion to lines other than the last (appending to the | ||||
last line is natively supported by the prompt toolkit). | ||||
""" | ||||
def __init__(self, style: str = "class:auto-suggestion") -> None: | ||||
self.style = style | ||||
def apply_transformation(self, ti: TransformationInput) -> Transformation: | ||||
is_last_line = ti.lineno == ti.document.line_count - 1 | ||||
is_active_line = ti.lineno == ti.document.cursor_position_row | ||||
if not is_last_line and is_active_line: | ||||
buffer = ti.buffer_control.buffer | ||||
if buffer.suggestion and ti.document.is_cursor_at_the_end_of_line: | ||||
suggestion = buffer.suggestion.text | ||||
else: | ||||
suggestion = "" | ||||
return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) | ||||
else: | ||||
return Transformation(fragments=ti.fragments) | ||||
krassowski
|
r28014 | |||
class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): | ||||
Matthias Bussonnier
|
r28028 | """ | ||
A subclass of AutoSuggestFromHistory that allow navigation to next/previous | ||||
suggestion from history. To do so it remembers the current position, but it | ||||
state need to carefully be cleared on the right events. | ||||
""" | ||||
krassowski
|
r28014 | |||
def __init__( | ||||
self, | ||||
): | ||||
self.skip_lines = 0 | ||||
self._connected_apps = [] | ||||
def reset_history_position(self, _: Buffer): | ||||
self.skip_lines = 0 | ||||
def disconnect(self): | ||||
for pt_app in self._connected_apps: | ||||
text_insert_event = pt_app.default_buffer.on_text_insert | ||||
text_insert_event.remove_handler(self.reset_history_position) | ||||
def connect(self, pt_app: PromptSession): | ||||
self._connected_apps.append(pt_app) | ||||
krassowski
|
r28015 | # note: `on_text_changed` could be used for a bit different behaviour | ||
# on character deletion (i.e. reseting history position on backspace) | ||||
krassowski
|
r28014 | pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position) | ||
Matthias Bussonnier
|
r28050 | pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss) | ||
krassowski
|
r28014 | |||
def get_suggestion( | ||||
self, buffer: Buffer, document: Document | ||||
) -> Optional[Suggestion]: | ||||
text = _get_query(document) | ||||
if text.strip(): | ||||
for suggestion, _ in self._find_next_match( | ||||
text, self.skip_lines, buffer.history | ||||
): | ||||
return Suggestion(suggestion) | ||||
return None | ||||
Matthias Bussonnier
|
r28050 | def _dismiss(self, buffer, *args, **kwargs): | ||
buffer.suggestion = None | ||||
krassowski
|
r28014 | def _find_match( | ||
self, text: str, skip_lines: float, history: History, previous: bool | ||||
Matthias Bussonnier
|
r28028 | ) -> Generator[Tuple[str, float], None, None]: | ||
""" | ||||
Matthias Bussonnier
|
r28031 | text : str | ||
Text content to find a match for, the user cursor is most of the | ||||
time at the end of this text. | ||||
skip_lines : float | ||||
number of items to skip in the search, this is used to indicate how | ||||
far in the list the user has navigated by pressing up or down. | ||||
The float type is used as the base value is +inf | ||||
history : History | ||||
prompt_toolkit History instance to fetch previous entries from. | ||||
previous : bool | ||||
Direction of the search, whether we are looking previous match | ||||
(True), or next match (False). | ||||
Matthias Bussonnier
|
r28028 | |||
Yields | ||||
------ | ||||
Tuple with: | ||||
str: | ||||
current suggestion. | ||||
float: | ||||
will actually yield only ints, which is passed back via skip_lines, | ||||
which may be a +inf (float) | ||||
""" | ||||
krassowski
|
r28014 | line_number = -1 | ||
for string in reversed(list(history.get_strings())): | ||||
for line in reversed(string.splitlines()): | ||||
line_number += 1 | ||||
if not previous and line_number < skip_lines: | ||||
continue | ||||
# do not return empty suggestions as these | ||||
# close the auto-suggestion overlay (and are useless) | ||||
if line.startswith(text) and len(line) > len(text): | ||||
yield line[len(text) :], line_number | ||||
if previous and line_number >= skip_lines: | ||||
return | ||||
Matthias Bussonnier
|
r28031 | def _find_next_match( | ||
self, text: str, skip_lines: float, history: History | ||||
) -> Generator[Tuple[str, float], None, None]: | ||||
krassowski
|
r28014 | return self._find_match(text, skip_lines, history, previous=False) | ||
def _find_previous_match(self, text: str, skip_lines: float, history: History): | ||||
return reversed( | ||||
list(self._find_match(text, skip_lines, history, previous=True)) | ||||
) | ||||
Matthias Bussonnier
|
r28031 | def up(self, query: str, other_than: str, history: History) -> None: | ||
krassowski
|
r28014 | for suggestion, line_number in self._find_next_match( | ||
query, self.skip_lines, history | ||||
): | ||||
# if user has history ['very.a', 'very', 'very.b'] and typed 'very' | ||||
Michał Krassowski
|
r28020 | # we want to switch from 'very.b' to 'very.a' because a) if the | ||
# suggestion equals current text, prompt-toolkit aborts suggesting | ||||
krassowski
|
r28014 | # b) user likely would not be interested in 'very' anyways (they | ||
# already typed it). | ||||
if query + suggestion != other_than: | ||||
self.skip_lines = line_number | ||||
break | ||||
else: | ||||
# no matches found, cycle back to beginning | ||||
self.skip_lines = 0 | ||||
Matthias Bussonnier
|
r28031 | def down(self, query: str, other_than: str, history: History) -> None: | ||
krassowski
|
r28014 | for suggestion, line_number in self._find_previous_match( | ||
query, self.skip_lines, history | ||||
): | ||||
if query + suggestion != other_than: | ||||
self.skip_lines = line_number | ||||
break | ||||
else: | ||||
# no matches found, cycle to end | ||||
for suggestion, line_number in self._find_previous_match( | ||||
query, float("Inf"), history | ||||
): | ||||
if query + suggestion != other_than: | ||||
self.skip_lines = line_number | ||||
break | ||||
# Needed for to accept autosuggestions in vi insert mode | ||||
def accept_in_vi_insert_mode(event: KeyPressEvent): | ||||
"""Apply autosuggestion if at end of line.""" | ||||
krassowski
|
r28015 | buffer = event.current_buffer | ||
d = buffer.document | ||||
krassowski
|
r28014 | after_cursor = d.text[d.cursor_position :] | ||
lines = after_cursor.split("\n") | ||||
end_of_current_line = lines[0].strip() | ||||
krassowski
|
r28015 | suggestion = buffer.suggestion | ||
krassowski
|
r28014 | if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""): | ||
krassowski
|
r28015 | buffer.insert_text(suggestion.text) | ||
krassowski
|
r28014 | else: | ||
nc.end_of_line(event) | ||||
def accept(event: KeyPressEvent): | ||||
"""Accept autosuggestion""" | ||||
krassowski
|
r28015 | buffer = event.current_buffer | ||
suggestion = buffer.suggestion | ||||
krassowski
|
r28014 | if suggestion: | ||
krassowski
|
r28015 | buffer.insert_text(suggestion.text) | ||
krassowski
|
r28014 | else: | ||
nc.forward_char(event) | ||||
krassowski
|
r28040 | def discard(event: KeyPressEvent): | ||
"""Discard autosuggestion""" | ||||
buffer = event.current_buffer | ||||
buffer.suggestion = None | ||||
krassowski
|
r28014 | def accept_word(event: KeyPressEvent): | ||
"""Fill partial autosuggestion by word""" | ||||
krassowski
|
r28015 | buffer = event.current_buffer | ||
suggestion = buffer.suggestion | ||||
krassowski
|
r28014 | if suggestion: | ||
t = re.split(r"(\S+\s+)", suggestion.text) | ||||
krassowski
|
r28015 | buffer.insert_text(next((x for x in t if x), "")) | ||
krassowski
|
r28014 | else: | ||
nc.forward_word(event) | ||||
def accept_character(event: KeyPressEvent): | ||||
"""Fill partial autosuggestion by character""" | ||||
b = event.current_buffer | ||||
suggestion = b.suggestion | ||||
if suggestion and suggestion.text: | ||||
b.insert_text(suggestion.text[0]) | ||||
krassowski
|
r28015 | def accept_and_keep_cursor(event: KeyPressEvent): | ||
"""Accept autosuggestion and keep cursor in place""" | ||||
buffer = event.current_buffer | ||||
old_position = buffer.cursor_position | ||||
suggestion = buffer.suggestion | ||||
if suggestion: | ||||
buffer.insert_text(suggestion.text) | ||||
buffer.cursor_position = old_position | ||||
krassowski
|
r28016 | |||
def accept_and_move_cursor_left(event: KeyPressEvent): | ||||
krassowski
|
r28017 | """Accept autosuggestion and move cursor left in place""" | ||
krassowski
|
r28016 | accept_and_keep_cursor(event) | ||
nc.backward_char(event) | ||||
krassowski
|
r28015 | |||
krassowski
|
r28041 | def _update_hint(buffer: Buffer): | ||
if buffer.auto_suggest: | ||||
suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document) | ||||
buffer.suggestion = suggestion | ||||
krassowski
|
r28015 | def backspace_and_resume_hint(event: KeyPressEvent): | ||
"""Resume autosuggestions after deleting last character""" | ||||
current_buffer = event.current_buffer | ||||
def resume_hinting(buffer: Buffer): | ||||
krassowski
|
r28041 | _update_hint(buffer) | ||
krassowski
|
r28015 | current_buffer.on_text_changed.remove_handler(resume_hinting) | ||
current_buffer.on_text_changed.add_handler(resume_hinting) | ||||
nc.backward_delete_char(event) | ||||
krassowski
|
r28041 | def up_and_update_hint(event: KeyPressEvent): | ||
"""Go up and update hint""" | ||||
current_buffer = event.current_buffer | ||||
current_buffer.auto_up(count=event.arg) | ||||
_update_hint(current_buffer) | ||||
def down_and_update_hint(event: KeyPressEvent): | ||||
"""Go down and update hint""" | ||||
current_buffer = event.current_buffer | ||||
current_buffer.auto_down(count=event.arg) | ||||
_update_hint(current_buffer) | ||||
krassowski
|
r28014 | def accept_token(event: KeyPressEvent): | ||
"""Fill partial autosuggestion by token""" | ||||
b = event.current_buffer | ||||
suggestion = b.suggestion | ||||
if suggestion: | ||||
prefix = _get_query(b.document) | ||||
text = prefix + suggestion.text | ||||
tokens: List[Optional[str]] = [None, None, None] | ||||
substrings = [""] | ||||
i = 0 | ||||
for token in generate_tokens(StringIO(text).readline): | ||||
if token.type == tokenize.NEWLINE: | ||||
index = len(text) | ||||
else: | ||||
index = text.index(token[1], len(substrings[-1])) | ||||
substrings.append(text[:index]) | ||||
tokenized_so_far = substrings[-1] | ||||
if tokenized_so_far.startswith(prefix): | ||||
if i == 0 and len(tokenized_so_far) > len(prefix): | ||||
tokens[0] = tokenized_so_far[len(prefix) :] | ||||
substrings.append(tokenized_so_far) | ||||
i += 1 | ||||
tokens[i] = token[1] | ||||
if i == 2: | ||||
break | ||||
i += 1 | ||||
if tokens[0]: | ||||
to_insert: str | ||||
insert_text = substrings[-2] | ||||
if tokens[1] and len(tokens[1]) == 1: | ||||
insert_text = substrings[-1] | ||||
to_insert = insert_text[len(prefix) :] | ||||
b.insert_text(to_insert) | ||||
return | ||||
nc.forward_word(event) | ||||
Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None] | ||||
def _swap_autosuggestion( | ||||
buffer: Buffer, | ||||
provider: NavigableAutoSuggestFromHistory, | ||||
direction_method: Callable, | ||||
): | ||||
""" | ||||
We skip most recent history entry (in either direction) if it equals the | ||||
current autosuggestion because if user cycles when auto-suggestion is shown | ||||
Matthias Bussonnier
|
r28029 | they most likely want something else than what was suggested (otherwise | ||
krassowski
|
r28014 | they would have accepted the suggestion). | ||
""" | ||||
suggestion = buffer.suggestion | ||||
if not suggestion: | ||||
return | ||||
query = _get_query(buffer.document) | ||||
current = query + suggestion.text | ||||
direction_method(query=query, other_than=current, history=buffer.history) | ||||
new_suggestion = provider.get_suggestion(buffer, buffer.document) | ||||
buffer.suggestion = new_suggestion | ||||
def swap_autosuggestion_up(provider: Provider): | ||||
def swap_autosuggestion_up(event: KeyPressEvent): | ||||
"""Get next autosuggestion from history.""" | ||||
if not isinstance(provider, NavigableAutoSuggestFromHistory): | ||||
return | ||||
return _swap_autosuggestion( | ||||
buffer=event.current_buffer, provider=provider, direction_method=provider.up | ||||
) | ||||
swap_autosuggestion_up.__name__ = "swap_autosuggestion_up" | ||||
return swap_autosuggestion_up | ||||
def swap_autosuggestion_down( | ||||
provider: Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None] | ||||
): | ||||
def swap_autosuggestion_down(event: KeyPressEvent): | ||||
"""Get previous autosuggestion from history.""" | ||||
if not isinstance(provider, NavigableAutoSuggestFromHistory): | ||||
return | ||||
return _swap_autosuggestion( | ||||
buffer=event.current_buffer, | ||||
provider=provider, | ||||
direction_method=provider.down, | ||||
) | ||||
swap_autosuggestion_down.__name__ = "swap_autosuggestion_down" | ||||
return swap_autosuggestion_down | ||||