mirror of
https://github.com/kovidgoyal/kitty.git
synced 2026-02-01 11:34:59 +01:00
348 lines
11 KiB
Python
348 lines
11 KiB
Python
#!/usr/bin/env python
|
|
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
|
|
|
|
|
|
import os
|
|
from collections import deque
|
|
from collections.abc import Callable, Sequence
|
|
from contextlib import suppress
|
|
from types import TracebackType
|
|
from typing import TYPE_CHECKING, Any, ContextManager, Deque, NamedTuple, Optional, cast
|
|
|
|
from kitty.constants import kitten_exe, running_in_kitty
|
|
from kitty.fast_data_types import monotonic, safe_pipe
|
|
from kitty.types import DecoratedFunc, ParsedShortcut
|
|
from kitty.typing import (
|
|
AbstractEventLoop,
|
|
BossType,
|
|
Debug,
|
|
ImageManagerType,
|
|
KeyActionType,
|
|
KeyEventType,
|
|
LoopType,
|
|
MouseButton,
|
|
MouseEvent,
|
|
ScreenSize,
|
|
TermManagerType,
|
|
WindowType,
|
|
)
|
|
|
|
from .operations import MouseTracking, pending_update
|
|
|
|
if TYPE_CHECKING:
|
|
from kitty.file_transmission import FileTransmissionCommand
|
|
|
|
|
|
OpenUrlHandler = Optional[Callable[[BossType, WindowType, str, int, str], bool]]
|
|
|
|
|
|
class ButtonEvent(NamedTuple):
|
|
mouse_event: MouseEvent
|
|
timestamp: float
|
|
|
|
|
|
def is_click(a: ButtonEvent, b: ButtonEvent) -> bool:
|
|
from .loop import EventType
|
|
if a.mouse_event.type is not EventType.PRESS or b.mouse_event.type is not EventType.RELEASE:
|
|
return False
|
|
x = a.mouse_event.cell_x - b.mouse_event.cell_x
|
|
y = a.mouse_event.cell_y - b.mouse_event.cell_y
|
|
return x*x + y*y <= 4
|
|
|
|
|
|
class KittenUI:
|
|
allow_remote_control: bool = False
|
|
remote_control_password: bool | str = False
|
|
|
|
def __init__(self, func: Callable[[list[str]], str], allow_remote_control: bool, remote_control_password: bool | str):
|
|
self.func = func
|
|
self.allow_remote_control = allow_remote_control
|
|
self.remote_control_password = remote_control_password
|
|
self.password = self.to = ''
|
|
self.rc_fd = -1
|
|
self.initialized = False
|
|
|
|
def initialize(self) -> None:
|
|
if self.initialized:
|
|
return
|
|
self.initialized = True
|
|
if running_in_kitty():
|
|
return
|
|
if self.allow_remote_control:
|
|
self.to = os.environ.get('KITTY_LISTEN_ON', '')
|
|
self.rc_fd = int(self.to.partition(':')[-1])
|
|
os.set_inheritable(self.rc_fd, False)
|
|
if (self.remote_control_password or self.remote_control_password == '') and not self.password:
|
|
import socket
|
|
with socket.fromfd(self.rc_fd, socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
|
data = s.recv(256)
|
|
if not data.endswith(b'\n'):
|
|
raise Exception(f'The remote control password was invalid: {data!r}')
|
|
self.password = data.strip().decode()
|
|
|
|
def __call__(self, args: list[str]) -> str:
|
|
self.initialize()
|
|
return self.func(args)
|
|
|
|
def allow_indiscriminate_remote_control(self, enable: bool = True) -> None:
|
|
if self.rc_fd > -1:
|
|
if enable:
|
|
os.set_inheritable(self.rc_fd, True)
|
|
if self.password:
|
|
os.environ['KITTY_RC_PASSWORD'] = self.password
|
|
else:
|
|
os.set_inheritable(self.rc_fd, False)
|
|
if self.password:
|
|
os.environ.pop('KITTY_RC_PASSWORD', None)
|
|
|
|
def remote_control(self, cmd: str | Sequence[str], **kw: Any) -> Any:
|
|
if not self.allow_remote_control:
|
|
raise ValueError('Remote control is not enabled, remember to use allow_remote_control=True')
|
|
prefix = [kitten_exe(), '@']
|
|
r = -1
|
|
pass_fds = list(kw.get('pass_fds') or ())
|
|
try:
|
|
if self.rc_fd > -1:
|
|
pass_fds.append(self.rc_fd)
|
|
if self.password and self.rc_fd > -1:
|
|
r, w = safe_pipe(False)
|
|
os.write(w, self.password.encode())
|
|
os.close(w)
|
|
prefix += ['--password-file', f'fd:{r}', '--use-password', 'always']
|
|
pass_fds.append(r)
|
|
if pass_fds:
|
|
kw['pass_fds'] = tuple(pass_fds)
|
|
if isinstance(cmd, str):
|
|
cmd = ' '.join(prefix)
|
|
else:
|
|
cmd = prefix + list(cmd)
|
|
import subprocess
|
|
if self.rc_fd > -1:
|
|
is_inheritable = os.get_inheritable(self.rc_fd)
|
|
if not is_inheritable:
|
|
os.set_inheritable(self.rc_fd, True)
|
|
try:
|
|
return subprocess.run(cmd, **kw)
|
|
finally:
|
|
if self.rc_fd > -1 and not is_inheritable:
|
|
os.set_inheritable(self.rc_fd, False)
|
|
finally:
|
|
if r > -1:
|
|
os.close(r)
|
|
|
|
|
|
def kitten_ui(
|
|
allow_remote_control: bool = KittenUI.allow_remote_control,
|
|
remote_control_password: bool | str = KittenUI.allow_remote_control,
|
|
) -> Callable[[Callable[[list[str]], str]], KittenUI]:
|
|
|
|
def wrapper(impl: Callable[..., Any]) -> KittenUI:
|
|
return KittenUI(impl, allow_remote_control, remote_control_password)
|
|
|
|
return wrapper
|
|
|
|
|
|
class Handler:
|
|
|
|
image_manager_class: type[ImageManagerType] | None = None
|
|
use_alternate_screen = True
|
|
mouse_tracking = MouseTracking.none
|
|
terminal_io_ended = False
|
|
overlay_ready_report_needed = False
|
|
|
|
def _initialize(
|
|
self,
|
|
screen_size: ScreenSize,
|
|
term_manager: TermManagerType,
|
|
schedule_write: Callable[[bytes], None],
|
|
tui_loop: LoopType,
|
|
debug: Debug,
|
|
image_manager: ImageManagerType | None = None
|
|
) -> None:
|
|
from .operations import commander
|
|
self.screen_size = screen_size
|
|
self._term_manager = term_manager
|
|
self._tui_loop = tui_loop
|
|
self._schedule_write = schedule_write
|
|
self.debug = debug
|
|
self.cmd = commander(self)
|
|
self._image_manager = image_manager
|
|
self._button_events: dict[MouseButton, Deque[ButtonEvent]] = {}
|
|
|
|
@property
|
|
def image_manager(self) -> ImageManagerType:
|
|
assert self._image_manager is not None
|
|
return self._image_manager
|
|
|
|
@property
|
|
def asyncio_loop(self) -> AbstractEventLoop:
|
|
return self._tui_loop.asyncio_loop
|
|
|
|
def add_shortcut(self, action: KeyActionType, spec: str | ParsedShortcut) -> None:
|
|
if not hasattr(self, '_key_shortcuts'):
|
|
self._key_shortcuts: dict[ParsedShortcut, KeyActionType] = {}
|
|
if isinstance(spec, str):
|
|
from kitty.key_encoding import parse_shortcut
|
|
spec = parse_shortcut(spec)
|
|
self._key_shortcuts[spec] = action
|
|
|
|
def shortcut_action(self, key_event: KeyEventType) -> KeyActionType | None:
|
|
for sc, action in self._key_shortcuts.items():
|
|
if key_event.matches(sc):
|
|
return action
|
|
return None
|
|
|
|
def __enter__(self) -> None:
|
|
if self._image_manager is not None:
|
|
self._image_manager.__enter__()
|
|
self.debug.fobj = self
|
|
self.initialize()
|
|
|
|
def __exit__(self, etype: type, value: Exception, tb: TracebackType) -> None:
|
|
del self.debug.fobj
|
|
with suppress(Exception):
|
|
self.finalize()
|
|
if self._image_manager is not None:
|
|
self._image_manager.__exit__(etype, value, tb)
|
|
|
|
def initialize(self) -> None:
|
|
pass
|
|
|
|
def finalize(self) -> None:
|
|
pass
|
|
|
|
def on_resize(self, screen_size: ScreenSize) -> None:
|
|
self.screen_size = screen_size
|
|
|
|
def quit_loop(self, return_code: int | None = None) -> None:
|
|
self._tui_loop.quit(return_code)
|
|
|
|
def on_term(self) -> None:
|
|
self._tui_loop.quit(1)
|
|
|
|
def on_hup(self) -> None:
|
|
self.terminal_io_ended = True
|
|
self._tui_loop.quit(1)
|
|
|
|
def on_key_event(self, key_event: KeyEventType, in_bracketed_paste: bool = False) -> None:
|
|
' Override this method and perform_default_key_action() to handle all key events '
|
|
if key_event.text:
|
|
self.on_text(key_event.text, in_bracketed_paste)
|
|
else:
|
|
self.on_key(key_event)
|
|
|
|
def perform_default_key_action(self, key_event: KeyEventType) -> bool:
|
|
' Override in sub-class if you want to handle these key events yourself '
|
|
if key_event.matches('ctrl+c'):
|
|
self.on_interrupt()
|
|
return True
|
|
if key_event.matches('ctrl+d'):
|
|
self.on_eot()
|
|
return True
|
|
return False
|
|
|
|
def on_text(self, text: str, in_bracketed_paste: bool = False) -> None:
|
|
pass
|
|
|
|
def on_key(self, key_event: KeyEventType) -> None:
|
|
pass
|
|
|
|
def on_mouse_event(self, mouse_event: MouseEvent) -> None:
|
|
from .loop import EventType
|
|
if mouse_event.type is EventType.MOVE:
|
|
self.on_mouse_move(mouse_event)
|
|
elif mouse_event.type is EventType.PRESS:
|
|
q = self._button_events.setdefault(mouse_event.buttons, deque())
|
|
q.append(ButtonEvent(mouse_event, monotonic()))
|
|
if len(q) > 5:
|
|
q.popleft()
|
|
elif mouse_event.type is EventType.RELEASE:
|
|
q = self._button_events.setdefault(mouse_event.buttons, deque())
|
|
q.append(ButtonEvent(mouse_event, monotonic()))
|
|
if len(q) > 5:
|
|
q.popleft()
|
|
if len(q) > 1 and is_click(q[-2], q[-1]):
|
|
self.on_click(mouse_event)
|
|
|
|
def on_mouse_move(self, mouse_event: MouseEvent) -> None:
|
|
pass
|
|
|
|
def on_click(self, mouse_event: MouseEvent) -> None:
|
|
pass
|
|
|
|
def on_interrupt(self) -> None:
|
|
pass
|
|
|
|
def on_eot(self) -> None:
|
|
pass
|
|
|
|
def on_writing_finished(self) -> None:
|
|
pass
|
|
|
|
def on_kitty_cmd_response(self, response: dict[str, Any]) -> None:
|
|
pass
|
|
|
|
def on_clipboard_response(self, text: str, from_primary: bool = False) -> None:
|
|
pass
|
|
|
|
def on_file_transfer_response(self, ftc: 'FileTransmissionCommand') -> None:
|
|
pass
|
|
|
|
def on_capability_response(self, name: str, val: str) -> None:
|
|
pass
|
|
|
|
def write(self, data: bytes | str) -> None:
|
|
if isinstance(data, str):
|
|
data = data.encode('utf-8')
|
|
self._schedule_write(data)
|
|
|
|
def flush(self) -> None:
|
|
pass
|
|
|
|
def print(self, *args: object, sep: str = ' ', end: str = '\r\n') -> None:
|
|
data = sep.join(map(str, args)) + end
|
|
self.write(data)
|
|
|
|
def suspend(self) -> ContextManager[TermManagerType]:
|
|
return self._term_manager.suspend()
|
|
|
|
@classmethod
|
|
def atomic_update(cls, func: DecoratedFunc) -> DecoratedFunc:
|
|
from functools import wraps
|
|
|
|
@wraps(func)
|
|
def f(*a: Any, **kw: Any) -> Any:
|
|
with pending_update(a[0].write):
|
|
return func(*a, **kw)
|
|
return cast(DecoratedFunc, f)
|
|
|
|
|
|
class HandleResult:
|
|
|
|
type_of_input: str | None = None
|
|
no_ui: bool = False
|
|
|
|
def __init__(self, impl: Callable[..., Any], type_of_input: str | None, no_ui: bool, has_ready_notification: bool, open_url_handler: OpenUrlHandler):
|
|
self.impl = impl
|
|
self.no_ui = no_ui
|
|
self.type_of_input = type_of_input
|
|
self.has_ready_notification = has_ready_notification
|
|
self.open_url_handler = open_url_handler
|
|
|
|
def __call__(self, args: Sequence[str], data: Any, target_window_id: int, boss: BossType) -> Any:
|
|
return self.impl(args, data, target_window_id, boss)
|
|
|
|
|
|
|
|
def result_handler(
|
|
type_of_input: str | None = None,
|
|
no_ui: bool = False,
|
|
has_ready_notification: bool = Handler.overlay_ready_report_needed,
|
|
open_url_handler: OpenUrlHandler = None,
|
|
) -> Callable[[Callable[..., Any]], HandleResult]:
|
|
|
|
def wrapper(impl: Callable[..., Any]) -> HandleResult:
|
|
return HandleResult(impl, type_of_input, no_ui, has_ready_notification, open_url_handler)
|
|
|
|
return wrapper
|