mirror of
https://github.com/kovidgoyal/kitty.git
synced 2026-02-01 11:34:59 +01:00
This was needed to fix various corner cases when doing blending of colors in linear space. The new architecture has the same performance as the old in the common case of opaque rendering with no UI layers or images. In the case of only positive z-index images there is a performance decrease as the OS Window is now rendered to a offscreen texture and then blitted to screen. However, in the future when we move to Vulkan or I can figure out how to get Wayland to accept buffers with colors in linear space, this performance penalty can be removed. The performance penalty was not significant on my system but this is highly GPU dependent. Modern GPUs are supposedly optimised for rendering to offscreen buffers, so we will see. The awrit project might be a good test case. Now either we have 1-shot rendering for the case of opaque with only ext or all the various pieces are rendered in successive draw calls into an offscreen buffer that is blitted to the output buffer after all drawing is done. Fixes #8869
1131 lines
34 KiB
Python
1131 lines
34 KiB
Python
#!/usr/bin/env python
|
|
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
|
|
|
|
import fcntl
|
|
import math
|
|
import os
|
|
import re
|
|
import string
|
|
import sys
|
|
from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence
|
|
from contextlib import contextmanager, suppress
|
|
from functools import lru_cache
|
|
from re import Match, Pattern
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
BinaryIO,
|
|
NamedTuple,
|
|
Optional,
|
|
cast,
|
|
)
|
|
|
|
from .constants import (
|
|
clear_handled_signals,
|
|
config_dir,
|
|
is_macos,
|
|
is_wayland,
|
|
kitten_exe,
|
|
runtime_dir,
|
|
shell_path,
|
|
ssh_control_master_template,
|
|
)
|
|
from .fast_data_types import WINDOW_FULLSCREEN, WINDOW_HIDDEN, WINDOW_MAXIMIZED, WINDOW_MINIMIZED, WINDOW_NORMAL, Color, Shlex, get_options, monotonic, open_tty
|
|
from .fast_data_types import timed_debug_print as _timed_debug_print
|
|
from .types import run_once
|
|
from .typing_compat import AddressFamily, PopenType, StartupCtx
|
|
|
|
if TYPE_CHECKING:
|
|
import tarfile
|
|
|
|
from .fast_data_types import OSWindowSize
|
|
from .options.types import Options
|
|
else:
|
|
Options = object
|
|
|
|
|
|
class Flag:
|
|
|
|
def __init__(self, initial_val: bool = True) -> None:
|
|
self.val = initial_val
|
|
|
|
def __enter__(self) -> None:
|
|
self.val ^= True
|
|
|
|
def __exit__(self, *a: object) -> None:
|
|
self.val ^= True
|
|
|
|
def __bool__(self) -> bool:
|
|
return self.val
|
|
|
|
|
|
disallow_expand_vars = Flag(False)
|
|
|
|
|
|
def expandvars(val: str, env: Mapping[str, str] = {}, fallback_to_os_env: bool = True) -> str:
|
|
'''
|
|
Expand $VAR and ${VAR} Use $$ for a literal $
|
|
'''
|
|
|
|
def sub(m: 'Match[str]') -> str:
|
|
key = m.group(1) or m.group(2)
|
|
result = env.get(key)
|
|
if result is None and fallback_to_os_env:
|
|
result = os.environ.get(key)
|
|
if result is None:
|
|
result = m.group()
|
|
return result
|
|
|
|
if disallow_expand_vars or '$' not in val:
|
|
return val
|
|
|
|
return re.sub(r'\$(?:(\w+)|\{([^}]+)\})', sub, val.replace('$$', '\0')).replace('\0', '$')
|
|
|
|
|
|
@lru_cache(maxsize=2)
|
|
def sgr_sanitizer_pat(for_splitting: bool = False) -> 're.Pattern[str]':
|
|
pat = '\033\\[.*?m'
|
|
if for_splitting:
|
|
return re.compile(f'({pat})')
|
|
return re.compile(pat)
|
|
|
|
|
|
@run_once
|
|
def kitty_ansi_sanitizer_pat() -> 're.Pattern[str]':
|
|
# removes ANSI sequences generated by kitty's ANSI output routines. Not
|
|
# suitable for stripping general ANSI sequences
|
|
return re.compile(r'\x1b(?:\[[0-9;:]*?m|\].*?\x1b\\)')
|
|
|
|
|
|
def platform_window_id(os_window_id: int) -> int | None:
|
|
if is_macos:
|
|
from .fast_data_types import cocoa_window_id
|
|
with suppress(Exception):
|
|
return cocoa_window_id(os_window_id)
|
|
if not is_wayland():
|
|
from .fast_data_types import x11_window_id
|
|
with suppress(Exception):
|
|
return x11_window_id(os_window_id)
|
|
return None
|
|
|
|
|
|
def safe_print(*a: Any, **k: Any) -> None:
|
|
with suppress(Exception):
|
|
print(*a, **k)
|
|
|
|
|
|
def log_error(*a: Any, **k: str) -> None:
|
|
from .fast_data_types import log_error_string
|
|
output = getattr(log_error, 'redirect', log_error_string)
|
|
with suppress(Exception):
|
|
msg = k.get('sep', ' ').join(map(str, a)) + k.get('end', '')
|
|
output(msg)
|
|
|
|
|
|
@contextmanager
|
|
def suppress_error_logging() -> Iterator[None]:
|
|
before = getattr(log_error, 'redirect', suppress_error_logging)
|
|
setattr(log_error, 'redirect', lambda *a: None)
|
|
try:
|
|
yield
|
|
finally:
|
|
if before is suppress_error_logging:
|
|
delattr(log_error, 'redirect')
|
|
else:
|
|
setattr(log_error, 'redirect', before)
|
|
|
|
|
|
def ceil_int(x: float) -> int:
|
|
return int(math.ceil(x))
|
|
|
|
|
|
def sanitize_title(x: str) -> str:
|
|
return re.sub(r'\s+', ' ', re.sub(r'[\0-\x19\x80-\x9f]', '', x))
|
|
|
|
|
|
def color_as_int(val: Color) -> int:
|
|
return int(val) & 0xffffff
|
|
|
|
|
|
def color_from_int(val: int) -> Color:
|
|
return Color((val >> 16) & 0xFF, (val >> 8) & 0xFF, val & 0xFF)
|
|
|
|
|
|
class ScreenSize(NamedTuple):
|
|
rows: int
|
|
cols: int
|
|
width: int
|
|
height: int
|
|
cell_width: int
|
|
cell_height: int
|
|
|
|
|
|
def read_screen_size(fd: int = -1) -> ScreenSize:
|
|
import array
|
|
import fcntl
|
|
import termios
|
|
buf = array.array('H', [0, 0, 0, 0])
|
|
if fd < 0:
|
|
fd = sys.stdout.fileno()
|
|
fcntl.ioctl(fd, termios.TIOCGWINSZ, cast(bytearray, buf))
|
|
rows, cols, width, height = tuple(buf)
|
|
cell_width, cell_height = width // (cols or 1), height // (rows or 1)
|
|
return ScreenSize(rows, cols, width, height, cell_width, cell_height)
|
|
|
|
|
|
class ScreenSizeGetter:
|
|
changed = True
|
|
Size = ScreenSize
|
|
ans: ScreenSize | None = None
|
|
|
|
def __init__(self, fd: int | None):
|
|
if fd is None:
|
|
fd = sys.stdout.fileno()
|
|
self.fd = fd
|
|
|
|
def __call__(self) -> ScreenSize:
|
|
if self.changed:
|
|
self.ans = read_screen_size(self.fd)
|
|
self.changed = False
|
|
return cast(ScreenSize, self.ans)
|
|
|
|
|
|
@lru_cache(maxsize=64, typed=True)
|
|
def screen_size_function(fd: int | None = None) -> ScreenSizeGetter:
|
|
return ScreenSizeGetter(fd)
|
|
|
|
|
|
def fit_image(width: int, height: int, pwidth: int, pheight: int) -> tuple[int, int]:
|
|
from math import floor
|
|
if height > pheight:
|
|
corrf = pheight / float(height)
|
|
width, height = floor(corrf * width), pheight
|
|
if width > pwidth:
|
|
corrf = pwidth / float(width)
|
|
width, height = pwidth, floor(corrf * height)
|
|
if height > pheight:
|
|
corrf = pheight / float(height)
|
|
width, height = floor(corrf * width), pheight
|
|
|
|
return int(width), int(height)
|
|
|
|
|
|
def base64_encode(
|
|
integer: int,
|
|
chars: str = string.ascii_uppercase + string.ascii_lowercase + string.digits +
|
|
'+/'
|
|
) -> str:
|
|
ans = ''
|
|
while True:
|
|
integer, remainder = divmod(integer, 64)
|
|
ans = chars[remainder] + ans
|
|
if integer == 0:
|
|
break
|
|
return ans
|
|
|
|
|
|
def command_for_open(program: str | list[str] = 'default') -> list[str]:
|
|
if isinstance(program, str):
|
|
from .conf.utils import to_cmdline
|
|
program = to_cmdline(program)
|
|
if program == ['default']:
|
|
cmd = ['open'] if is_macos else ['xdg-open']
|
|
else:
|
|
cmd = program
|
|
return cmd
|
|
|
|
|
|
def open_cmd(cmd: Iterable[str] | list[str], arg: None | Iterable[str] | str = None,
|
|
cwd: str | None = None, extra_env: dict[str, str] | None = None) -> 'PopenType[bytes]':
|
|
import subprocess
|
|
if arg is not None:
|
|
cmd = list(cmd)
|
|
if isinstance(arg, str):
|
|
cmd.append(arg)
|
|
else:
|
|
cmd.extend(arg)
|
|
env: dict[str, str] | None = None
|
|
if extra_env:
|
|
env = os.environ.copy()
|
|
env.update(extra_env)
|
|
return subprocess.Popen(
|
|
tuple(cmd), stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, cwd=cwd or None,
|
|
preexec_fn=clear_handled_signals, env=env)
|
|
|
|
|
|
def open_url(url: str, program: str | list[str] = 'default', cwd: str | None = None, extra_env: dict[str, str] | None = None) -> 'PopenType[bytes]':
|
|
return open_cmd(command_for_open(program), url, cwd=cwd, extra_env=extra_env)
|
|
|
|
|
|
def init_startup_notification_x11(window_handle: int, startup_id: str | None = None) -> Optional['StartupCtx']:
|
|
# https://specifications.freedesktop.org/startup-notification-spec/startup-notification-latest.txt
|
|
from kitty.fast_data_types import init_x11_startup_notification
|
|
sid = startup_id or os.environ.pop('DESKTOP_STARTUP_ID', None) # ensure child processes don't get this env var
|
|
if not sid:
|
|
return None
|
|
from .fast_data_types import x11_display
|
|
display = x11_display()
|
|
if not display:
|
|
return None
|
|
return init_x11_startup_notification(display, window_handle, sid)
|
|
|
|
|
|
def end_startup_notification_x11(ctx: 'StartupCtx') -> None:
|
|
from kitty.fast_data_types import end_x11_startup_notification
|
|
end_x11_startup_notification(ctx)
|
|
|
|
|
|
def init_startup_notification(window_handle: int | None, startup_id: str | None = None) -> Optional['StartupCtx']:
|
|
if is_macos or is_wayland():
|
|
return None
|
|
if window_handle is None:
|
|
log_error('Could not perform startup notification as window handle not present')
|
|
return None
|
|
try:
|
|
try:
|
|
return init_startup_notification_x11(window_handle, startup_id)
|
|
except OSError as e:
|
|
if not str(e).startswith("Failed to load libstartup-notification"):
|
|
raise e
|
|
log_error(
|
|
f'{e}. This has two main effects:',
|
|
'There will be no startup feedback and when using --single-instance, kitty windows may start on an incorrect desktop/workspace.')
|
|
except Exception:
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
|
|
def end_startup_notification(ctx: Optional['StartupCtx']) -> None:
|
|
if not ctx:
|
|
return
|
|
if is_macos or is_wayland():
|
|
return
|
|
try:
|
|
end_startup_notification_x11(ctx)
|
|
except Exception:
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
class startup_notification_handler:
|
|
|
|
# WARNING: This only works on X11 on other platforms extra_callback will be called
|
|
# after the window is shown, not before, as they do not do two stage window
|
|
# creation.
|
|
|
|
def __init__(self, do_notify: bool = True, startup_id: str | None = None, extra_callback: Callable[[int], None] | None = None):
|
|
self.do_notify = do_notify
|
|
self.startup_id = startup_id
|
|
self.extra_callback = extra_callback
|
|
self.ctx: Optional['StartupCtx'] = None
|
|
|
|
def __enter__(self) -> Callable[[int], None]:
|
|
|
|
def pre_show_callback(window_handle: int) -> None:
|
|
if self.extra_callback is not None:
|
|
self.extra_callback(window_handle)
|
|
if self.do_notify:
|
|
self.ctx = init_startup_notification(window_handle, self.startup_id)
|
|
|
|
return pre_show_callback
|
|
|
|
def __exit__(self, *a: Any) -> None:
|
|
if self.ctx is not None:
|
|
end_startup_notification(self.ctx)
|
|
|
|
|
|
def unix_socket_directories() -> Iterator[str]:
|
|
import tempfile
|
|
home = os.path.expanduser('~')
|
|
candidates = [tempfile.gettempdir(), home]
|
|
if is_macos:
|
|
from .fast_data_types import user_cache_dir
|
|
candidates = [user_cache_dir(), '/Library/Caches']
|
|
else:
|
|
if os.environ.get('XDG_RUNTIME_DIR'):
|
|
candidates.insert(0, os.environ['XDG_RUNTIME_DIR'])
|
|
for loc in candidates:
|
|
if os.access(loc, os.W_OK | os.R_OK | os.X_OK):
|
|
yield loc
|
|
|
|
|
|
def unix_socket_paths(name: str, ext: str = '.lock') -> Generator[str, None, None]:
|
|
home = os.path.expanduser('~')
|
|
for loc in unix_socket_directories():
|
|
filename = ('.' if loc == home else '') + name + ext
|
|
yield os.path.join(loc, filename)
|
|
|
|
|
|
def parse_address_spec(spec: str) -> tuple[AddressFamily, tuple[str, int] | str, str | None]:
|
|
import socket
|
|
try:
|
|
protocol, rest = spec.split(':', 1)
|
|
except ValueError:
|
|
raise ValueError(f'Invalid listen-on value: {spec} must be of the form protocol:address')
|
|
socket_path = None
|
|
address: str | tuple[str, int] = ''
|
|
if protocol == 'unix':
|
|
family = socket.AF_UNIX
|
|
address = rest
|
|
if address.startswith('@') and len(address) > 1:
|
|
address = '\0' + address[1:]
|
|
else:
|
|
socket_path = address
|
|
elif protocol in ('tcp', 'tcp6'):
|
|
family = socket.AF_INET if protocol == 'tcp' else socket.AF_INET6
|
|
if rest.startswith('['): # ]
|
|
host = rest[1:]
|
|
host, sep, leftover = host.rpartition(']')
|
|
_, port = leftover.rsplit(':', 1)
|
|
if ':' in host and protocol == 'tcp':
|
|
family = socket.AF_INET6
|
|
else:
|
|
host, port = rest.rsplit(':', 1)
|
|
address = host, int(port)
|
|
else:
|
|
raise ValueError(f'Unknown protocol in listen-on value: {spec}')
|
|
return family, address, socket_path
|
|
|
|
|
|
def parse_os_window_state(state: str) -> int:
|
|
match state:
|
|
case 'normal':
|
|
return WINDOW_NORMAL
|
|
case 'maximized':
|
|
return WINDOW_MAXIMIZED
|
|
case 'minimized':
|
|
return WINDOW_MINIMIZED
|
|
case 'fullscreen' | 'fullscreened':
|
|
return WINDOW_FULLSCREEN
|
|
case 'hidden':
|
|
return WINDOW_HIDDEN
|
|
case _:
|
|
return WINDOW_NORMAL
|
|
|
|
|
|
def write_all(fd: int, data: str | bytes, block_until_written: bool = True) -> None:
|
|
if isinstance(data, str):
|
|
data = data.encode('utf-8')
|
|
mvd = memoryview(data)
|
|
while len(mvd) > 0:
|
|
try:
|
|
n = os.write(fd, mvd)
|
|
except BlockingIOError:
|
|
if not block_until_written:
|
|
raise
|
|
continue
|
|
if not n:
|
|
break
|
|
mvd = mvd[n:]
|
|
|
|
|
|
class TTYIO:
|
|
|
|
def __init__(self, read_with_timeout: bool = True):
|
|
self.read_with_timeout = read_with_timeout
|
|
|
|
def __enter__(self) -> 'TTYIO':
|
|
self.tty_fd, self.original_termios = open_tty(self.read_with_timeout)
|
|
return self
|
|
|
|
def __exit__(self, *a: Any) -> None:
|
|
from .fast_data_types import close_tty
|
|
close_tty(self.tty_fd, self.original_termios)
|
|
|
|
def wait_till_read_available(self) -> bool:
|
|
if self.read_with_timeout:
|
|
raise ValueError('Cannot wait when TTY is set to read with timeout')
|
|
import select
|
|
rd = select.select([self.tty_fd], [], [])[0]
|
|
return bool(rd)
|
|
|
|
def read(self, limit: int) -> bytes:
|
|
return os.read(self.tty_fd, limit)
|
|
|
|
def send(self, data: str | bytes | Iterable[str | bytes]) -> None:
|
|
if isinstance(data, (str, bytes)):
|
|
write_all(self.tty_fd, data)
|
|
else:
|
|
for chunk in data:
|
|
write_all(self.tty_fd, chunk)
|
|
|
|
def recv(self, more_needed: Callable[[bytes], bool], timeout: float, sz: int = 1) -> None:
|
|
fd = self.tty_fd
|
|
start_time = monotonic()
|
|
while timeout > monotonic() - start_time:
|
|
# will block for 0.1 secs waiting for data because we have set
|
|
# VMIN=0 VTIME=1 in termios
|
|
data = os.read(fd, sz)
|
|
if data and not more_needed(data):
|
|
break
|
|
|
|
|
|
def set_echo(fd: int = -1, on: bool = False) -> tuple[int, list[int | list[bytes | int]]]:
|
|
import termios
|
|
if fd < 0:
|
|
fd = sys.stdin.fileno()
|
|
old = termios.tcgetattr(fd)
|
|
new = termios.tcgetattr(fd)
|
|
if on:
|
|
new[3] |= termios.ECHO
|
|
else:
|
|
new[3] &= ~termios.ECHO
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, new)
|
|
return fd, old
|
|
|
|
|
|
@contextmanager
|
|
def no_echo(fd: int = -1) -> Iterator[None]:
|
|
import termios
|
|
fd, old = set_echo(fd)
|
|
try:
|
|
yield
|
|
finally:
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, old)
|
|
|
|
|
|
def natsort_ints(iterable: Iterable[str]) -> list[str]:
|
|
|
|
def convert(text: str) -> int | str:
|
|
return int(text) if text.isdigit() else text
|
|
|
|
def alphanum_key(key: str) -> tuple[int | str, ...]:
|
|
return tuple(map(convert, re.split(r'(\d+)', key)))
|
|
|
|
return sorted(iterable, key=alphanum_key)
|
|
|
|
|
|
def get_hostname(fallback: str = '') -> str:
|
|
import socket
|
|
try:
|
|
return socket.gethostname() or fallback
|
|
except Exception:
|
|
return fallback
|
|
|
|
|
|
def resolve_editor_cmd(editor: str, shell_env: Mapping[str, str]) -> str | None:
|
|
import shlex
|
|
editor_cmd = list(shlex_split(editor))
|
|
editor_exe = (editor_cmd or ('',))[0]
|
|
if editor_exe and os.path.isabs(editor_exe):
|
|
return editor
|
|
if not editor_exe:
|
|
return None
|
|
|
|
def patched(exe: str) -> str:
|
|
editor_cmd[0] = exe
|
|
return ' '.join(map(shlex.quote, editor_cmd))
|
|
|
|
if shell_env is os.environ:
|
|
q = which(editor_exe, only_system=True)
|
|
if q:
|
|
return patched(q)
|
|
elif 'PATH' in shell_env:
|
|
import shutil
|
|
q = shutil.which(editor_exe, path=shell_env['PATH'])
|
|
if q:
|
|
return patched(q)
|
|
return None
|
|
|
|
|
|
def get_editor_from_env(env: Mapping[str, str]) -> str | None:
|
|
for var in ('VISUAL', 'EDITOR'):
|
|
editor = env.get(var)
|
|
if editor:
|
|
editor = resolve_editor_cmd(editor, env)
|
|
if editor:
|
|
return editor
|
|
return None
|
|
|
|
|
|
def get_editor_from_env_vars(opts: Options | None = None) -> list[str]:
|
|
editor = get_editor_from_env(os.environ)
|
|
if not editor:
|
|
shell_env = read_shell_environment(opts)
|
|
editor = get_editor_from_env(shell_env)
|
|
|
|
for ans in (editor, 'vim', 'nvim', 'vi', 'emacs', 'hx', 'kak', 'micro', 'nano', 'vis'):
|
|
if ans and which(next(shlex_split(ans)), only_system=True):
|
|
break
|
|
else:
|
|
ans = 'vim'
|
|
return list(shlex_split(ans))
|
|
|
|
|
|
def get_editor(opts: Options | None = None, path_to_edit: str = '', line_number: int = 0) -> list[str]:
|
|
if opts is None:
|
|
try:
|
|
opts = get_options()
|
|
except RuntimeError:
|
|
# we are in a kitten
|
|
from .cli import create_default_opts
|
|
opts = create_default_opts()
|
|
if opts.editor == '.':
|
|
ans = get_editor_from_env_vars()
|
|
else:
|
|
ans = list(shlex_split(opts.editor))
|
|
ans[0] = os.path.expanduser(ans[0])
|
|
if path_to_edit:
|
|
if line_number:
|
|
eq = os.path.basename(ans[0]).lower()
|
|
if eq in ('code', 'code.exe'):
|
|
path_to_edit += f':{line_number}'
|
|
ans.append('--goto')
|
|
else:
|
|
ans.append(f'+{line_number}')
|
|
ans.append(path_to_edit)
|
|
return ans
|
|
|
|
|
|
def is_path_in_temp_dir(path: str) -> bool:
|
|
if not path:
|
|
return False
|
|
|
|
def abspath(x: str | None) -> str:
|
|
if x:
|
|
x = os.path.abspath(os.path.realpath(x))
|
|
return x or ''
|
|
|
|
import tempfile
|
|
path = abspath(path)
|
|
candidates = frozenset(map(abspath, ('/tmp', '/dev/shm', os.environ.get('TMPDIR', None), tempfile.gettempdir())))
|
|
for q in candidates:
|
|
if q and path.startswith(q):
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_ok_to_read_image_file(path: str, fd: int) -> bool:
|
|
import stat
|
|
path = os.path.abspath(os.path.realpath(path))
|
|
try:
|
|
path_stat = os.stat(path, follow_symlinks=True)
|
|
fd_stat = os.fstat(fd)
|
|
except OSError:
|
|
return False
|
|
if not os.path.samestat(path_stat, fd_stat):
|
|
return False
|
|
parts = path.split(os.sep)[1:]
|
|
if len(parts) < 1:
|
|
return False
|
|
if parts[0] in ('sys', 'proc', 'dev'):
|
|
if parts[0] == 'dev':
|
|
return len(parts) > 2 and parts[1] == 'shm'
|
|
return False
|
|
return stat.S_ISREG(fd_stat.st_mode)
|
|
|
|
|
|
def resolve_abs_or_config_path(path: str, env: Mapping[str, str] | None = None, conf_dir: str | None = None) -> str:
|
|
path = os.path.expanduser(path)
|
|
path = expandvars(path, env or {})
|
|
if not os.path.isabs(path):
|
|
path = os.path.join(conf_dir or config_dir, path)
|
|
return path
|
|
|
|
|
|
def resolve_custom_file(path: str) -> str:
|
|
opts: Options | None = None
|
|
with suppress(RuntimeError):
|
|
opts = get_options()
|
|
return resolve_abs_or_config_path(path, opts.env if opts else {})
|
|
|
|
|
|
def func_name(f: Any) -> str:
|
|
if hasattr(f, '__name__'):
|
|
return str(f.__name__)
|
|
if hasattr(f, 'func') and hasattr(f.func, '__name__'):
|
|
return str(f.func.__name__)
|
|
return str(f)
|
|
|
|
|
|
def resolved_shell(opts: Options | None = None) -> list[str]:
|
|
q: str = getattr(opts, 'shell', '.')
|
|
if q == '.':
|
|
ans = [shell_path]
|
|
else:
|
|
env = {}
|
|
if opts is not None:
|
|
env['TERM'] = opts.term
|
|
if 'SHELL' not in os.environ:
|
|
env['SHELL'] = shell_path
|
|
if 'HOME' not in os.environ:
|
|
env['HOME'] = os.path.expanduser('~')
|
|
if 'USER' not in os.environ:
|
|
import pwd
|
|
env['USER'] = pwd.getpwuid(os.geteuid()).pw_name
|
|
def expand(x: str) -> str:
|
|
return expandvars(x, env)
|
|
ans = list(map(expand, shlex_split(q)))
|
|
return ans
|
|
|
|
|
|
@run_once
|
|
def system_paths_on_macos() -> tuple[str, ...]:
|
|
entries, seen = [], set()
|
|
|
|
def add_from_file(x: str) -> None:
|
|
try:
|
|
f = open(x)
|
|
except (FileNotFoundError, PermissionError):
|
|
return
|
|
with f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#') and line not in seen:
|
|
if os.path.isdir(line):
|
|
seen.add(line)
|
|
entries.append(line)
|
|
try:
|
|
files = os.listdir('/etc/paths.d')
|
|
except (FileNotFoundError, PermissionError):
|
|
files = []
|
|
for name in sorted(files):
|
|
add_from_file(os.path.join('/etc/paths.d', name))
|
|
add_from_file('/etc/paths')
|
|
return tuple(entries)
|
|
|
|
|
|
def which(name: str, only_system: bool = False) -> str | None:
|
|
if os.sep in name:
|
|
return name
|
|
import shutil
|
|
|
|
opts: Options | None = None
|
|
with suppress(RuntimeError):
|
|
opts = get_options()
|
|
|
|
tried_paths = set()
|
|
paths = []
|
|
append_paths = []
|
|
if opts and opts.exe_search_path:
|
|
for x in opts.exe_search_path:
|
|
x = x.strip()
|
|
if x:
|
|
if x[0] == '-':
|
|
tried_paths.add(os.path.expanduser(x[1:]))
|
|
elif x[0] == '+':
|
|
append_paths.append(os.path.expanduser(x[1:]))
|
|
else:
|
|
paths.append(os.path.expanduser(x))
|
|
ep = os.environ.get('PATH')
|
|
if ep:
|
|
paths.extend(ep.split(os.pathsep))
|
|
paths.append(os.path.expanduser('~/.local/bin'))
|
|
paths.append(os.path.expanduser('~/bin'))
|
|
paths.extend(append_paths)
|
|
ans = shutil.which(name, path=os.pathsep.join(x for x in paths if x not in tried_paths))
|
|
if ans:
|
|
return ans
|
|
# In case PATH is messed up try a default set of paths
|
|
if is_macos:
|
|
system_paths = system_paths_on_macos()
|
|
else:
|
|
system_paths = ('/usr/local/bin', '/opt/bin', '/usr/bin', '/bin', '/usr/sbin', '/sbin')
|
|
tried_paths |= set(paths)
|
|
system_paths = tuple(x for x in system_paths if x not in tried_paths)
|
|
if system_paths:
|
|
ans = shutil.which(name, path=os.pathsep.join(system_paths))
|
|
if ans:
|
|
return ans
|
|
tried_paths |= set(system_paths)
|
|
if only_system or opts is None:
|
|
return None
|
|
shell_env = read_shell_environment(opts)
|
|
for xenv in (shell_env, opts.env):
|
|
q = xenv.get('PATH')
|
|
if q:
|
|
paths = [x for x in xenv['PATH'].split(os.pathsep) if x not in tried_paths]
|
|
ans = shutil.which(name, path=os.pathsep.join(paths))
|
|
if ans:
|
|
return ans
|
|
tried_paths |= set(paths)
|
|
return None
|
|
|
|
|
|
def read_shell_environment(opts: Options | None = None) -> dict[str, str]:
|
|
ans: dict[str, str] | None = getattr(read_shell_environment, 'ans', None)
|
|
if ans is None:
|
|
from .child import openpty
|
|
ans = {}
|
|
setattr(read_shell_environment, 'ans', ans)
|
|
import subprocess
|
|
shell = resolved_shell(opts)
|
|
master, slave = openpty()
|
|
os.set_blocking(master, False)
|
|
if '-l' not in shell and '--login' not in shell:
|
|
shell += ['-l']
|
|
if '-i' not in shell and '--interactive' not in shell:
|
|
shell += ['-i']
|
|
try:
|
|
p = subprocess.Popen(
|
|
shell + ['-c', 'env'], stdout=slave, stdin=slave, stderr=slave, start_new_session=True, close_fds=True,
|
|
preexec_fn=clear_handled_signals)
|
|
except FileNotFoundError:
|
|
log_error('Could not find shell to read environment')
|
|
return ans
|
|
with os.fdopen(master, 'rb') as stdout, os.fdopen(slave, 'wb'):
|
|
raw = b''
|
|
from time import monotonic
|
|
start_time = monotonic()
|
|
while monotonic() - start_time < 1.5:
|
|
try:
|
|
ret: int | None = p.wait(0.01)
|
|
except subprocess.TimeoutExpired:
|
|
ret = None
|
|
with suppress(Exception):
|
|
raw += stdout.read()
|
|
if ret is not None:
|
|
break
|
|
if cast(Optional[int], p.returncode) is None:
|
|
log_error('Timed out waiting for shell to quit while reading shell environment')
|
|
p.kill()
|
|
elif p.returncode == 0:
|
|
while True:
|
|
try:
|
|
x = stdout.read()
|
|
except Exception:
|
|
break
|
|
if not x:
|
|
break
|
|
raw += x
|
|
draw = raw.decode('utf-8', 'replace')
|
|
for line in draw.splitlines():
|
|
k, v = line.partition('=')[::2]
|
|
if k and v:
|
|
ans[k] = v
|
|
else:
|
|
log_error('Failed to run shell to read its environment')
|
|
return ans
|
|
|
|
|
|
def parse_uri_list(text: str) -> Generator[str, None, None]:
|
|
' Get paths from file:// URLs '
|
|
from urllib.parse import unquote, urlparse
|
|
for line in text.splitlines():
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
if not line.startswith('file://'):
|
|
yield line
|
|
continue
|
|
try:
|
|
purl = urlparse(line, allow_fragments=False)
|
|
except Exception:
|
|
yield line
|
|
continue
|
|
if purl.path:
|
|
yield unquote(purl.path)
|
|
|
|
|
|
def edit_config_file() -> None:
|
|
from kitty.config import prepare_config_file_for_editing
|
|
p = prepare_config_file_for_editing()
|
|
editor = get_editor()
|
|
os.execvp(editor[0], editor + [p])
|
|
|
|
|
|
class SSHConnectionData(NamedTuple):
|
|
binary: str
|
|
hostname: str
|
|
port: int | None = None
|
|
identity_file: str = ''
|
|
extra_args: tuple[tuple[str, str], ...] = ()
|
|
|
|
|
|
def get_new_os_window_size(
|
|
metrics: 'OSWindowSize', width: int, height: int, unit: str, incremental: bool = False, has_window_scaling: bool = True
|
|
) -> tuple[int, int]:
|
|
if unit == 'cells':
|
|
cw = metrics['cell_width']
|
|
ch = metrics['cell_height']
|
|
width *= cw
|
|
height *= ch
|
|
if has_window_scaling:
|
|
width = round(width / metrics['xscale'])
|
|
height = round(height / metrics['yscale'])
|
|
if incremental:
|
|
w = metrics['width'] + width
|
|
h = metrics['height'] + height
|
|
else:
|
|
w = width or metrics['width']
|
|
h = height or metrics['height']
|
|
return w, h
|
|
|
|
|
|
def get_all_processes() -> Iterable[int]:
|
|
if is_macos:
|
|
from kitty.fast_data_types import get_all_processes as f
|
|
yield from f()
|
|
else:
|
|
for c in os.listdir('/proc'):
|
|
if c.isdigit():
|
|
yield int(c)
|
|
|
|
|
|
def is_kitty_gui_cmdline(*cmd: str) -> bool:
|
|
if not cmd:
|
|
return False
|
|
if os.path.basename(cmd[0]) != 'kitty':
|
|
return False
|
|
if len(cmd) == 1:
|
|
return True
|
|
s = cmd[1][:1]
|
|
if s == '@':
|
|
return False
|
|
if s == '+':
|
|
if cmd[1] == '+':
|
|
return len(cmd) > 2 and cmd[2] == 'open'
|
|
return cmd[1] == '+open'
|
|
return True
|
|
|
|
|
|
def reload_conf_in_all_kitties() -> None:
|
|
import signal
|
|
|
|
from kitty.child import cmdline_of_pid
|
|
|
|
for pid in get_all_processes():
|
|
try:
|
|
cmd = cmdline_of_pid(pid)
|
|
except Exception:
|
|
continue
|
|
if cmd and is_kitty_gui_cmdline(*cmd):
|
|
os.kill(pid, signal.SIGUSR1)
|
|
|
|
|
|
@run_once
|
|
def control_codes_pat() -> 'Pattern[str]':
|
|
return re.compile('[\x00-\x09\x0b-\x1f\x7f-\x9f]')
|
|
|
|
|
|
def sanitize_control_codes(text: str, replace_with: str = '') -> str:
|
|
return control_codes_pat().sub(replace_with, text)
|
|
|
|
|
|
def hold_till_enter() -> None:
|
|
import subprocess
|
|
|
|
from .constants import kitten_exe
|
|
subprocess.Popen([kitten_exe(), '__hold_till_enter__']).wait()
|
|
|
|
|
|
def cleanup_ssh_control_masters() -> None:
|
|
import glob
|
|
import subprocess
|
|
try:
|
|
files = frozenset(glob.glob(os.path.join(runtime_dir(), ssh_control_master_template.format(
|
|
kitty_pid=os.getpid(), ssh_placeholder='*'))))
|
|
except OSError:
|
|
return
|
|
workers = tuple(subprocess.Popen([
|
|
'ssh', '-o', f'ControlPath={x}', '-O', 'exit', 'kitty-unused-host-name'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
preexec_fn=clear_handled_signals) for x in files)
|
|
for w in workers:
|
|
w.wait()
|
|
for x in files:
|
|
with suppress(OSError):
|
|
os.remove(x)
|
|
|
|
|
|
def path_from_osc7_url(url: str | bytes) -> str:
|
|
if isinstance(url, bytes):
|
|
url = url.decode('utf-8')
|
|
if url.startswith('kitty-shell-cwd://'):
|
|
return '/' + url.split('/', 3)[-1]
|
|
if url.startswith('file://'):
|
|
from urllib.parse import unquote, urlparse
|
|
return unquote(urlparse(url).path)
|
|
return ''
|
|
|
|
|
|
@run_once
|
|
def macos_version() -> tuple[int, ...]:
|
|
# platform.mac_ver does not work thanks to Apple's stupid "hardening", so just use sw_vers
|
|
import subprocess
|
|
try:
|
|
o = subprocess.check_output(['sw_vers', '-productVersion'], stderr=subprocess.STDOUT).decode()
|
|
except Exception:
|
|
return 0, 0, 0
|
|
return tuple(map(int, o.strip().split('.')))
|
|
|
|
|
|
@lru_cache(maxsize=2)
|
|
def less_version(less_exe: str = 'less') -> int:
|
|
import subprocess
|
|
o = subprocess.check_output([less_exe, '-V'], stderr=subprocess.STDOUT).decode()
|
|
m = re.match(r'less (\d+)', o)
|
|
if m is None:
|
|
raise ValueError(f'Invalid version string for less: {o}')
|
|
return int(m.group(1))
|
|
|
|
|
|
def is_pid_alive(pid: int) -> bool:
|
|
try:
|
|
os.kill(pid, 0)
|
|
except ProcessLookupError:
|
|
return False
|
|
except Exception:
|
|
pass
|
|
return True
|
|
|
|
|
|
def safer_fork() -> int:
|
|
pid = os.fork()
|
|
if pid:
|
|
# master
|
|
import ssl
|
|
ssl.RAND_add(os.urandom(32), 0.0)
|
|
else:
|
|
# child
|
|
import atexit
|
|
atexit._clear()
|
|
return pid
|
|
|
|
|
|
def docs_url(which: str = '', local_docs_root: str | None = '') -> str:
|
|
from urllib.parse import quote
|
|
|
|
from .conf.types import resolve_ref
|
|
from .constants import local_docs, website_url
|
|
if local_docs_root is None:
|
|
ld = ''
|
|
else:
|
|
ld = local_docs_root or local_docs()
|
|
base, frag = which.partition('#')[::2]
|
|
base = base.strip('/')
|
|
if frag.startswith('ref='):
|
|
ref = frag[4:]
|
|
which = resolve_ref(ref, lambda x: x)
|
|
if which.startswith('https://') or which.startswith('http://'):
|
|
return which
|
|
base, frag = which.partition('#')[::2]
|
|
base = base.strip('/')
|
|
if ld:
|
|
base = base or 'index'
|
|
url = f'file://{ld}/' + quote(base) + '.html'
|
|
else:
|
|
url = website_url(base)
|
|
if frag:
|
|
url += '#' + frag
|
|
return url
|
|
|
|
|
|
def sanitize_for_bracketed_paste(text: bytes) -> bytes:
|
|
pat = re.compile(b'(?:(?:\033\\\x5b)|(?:\x9b))201~')
|
|
while True:
|
|
new_text = pat.sub(b'', text)
|
|
if new_text == text:
|
|
break
|
|
text = new_text
|
|
return text
|
|
|
|
|
|
@lru_cache(maxsize=64)
|
|
def sanitize_url_for_display_to_user(url: str) -> str:
|
|
from urllib.parse import unquote, urlparse, urlunparse
|
|
try:
|
|
purl = urlparse(url)
|
|
if purl.netloc:
|
|
purl = purl._replace(netloc=purl.netloc.encode('idna').decode('ascii'))
|
|
if purl.path:
|
|
purl = purl._replace(path=unquote(purl.path))
|
|
url = urlunparse(purl)
|
|
except Exception as e:
|
|
log_error(e)
|
|
url = 'Unparseable URL: ' + url
|
|
return url
|
|
|
|
|
|
def extract_all_from_tarfile_safely(tf: 'tarfile.TarFile', dest: str) -> None:
|
|
# Ensure that all extracted items are within dest
|
|
|
|
def is_within_directory(directory: str, target: str) -> bool:
|
|
abs_directory = os.path.abspath(directory)
|
|
abs_target = os.path.abspath(target)
|
|
prefix = os.path.commonprefix((abs_directory, abs_target))
|
|
return prefix == abs_directory
|
|
|
|
def safe_extract(tar: 'tarfile.TarFile', path: str = ".", numeric_owner: bool = False) -> None:
|
|
for member in tar.getmembers():
|
|
member_path = os.path.join(path, member.name)
|
|
if not is_within_directory(path, member_path):
|
|
raise ValueError(f'Attempted path traversal in tar file: {member.name}')
|
|
tar.extractall(path, tar.getmembers(), numeric_owner=numeric_owner)
|
|
|
|
safe_extract(tf, dest)
|
|
|
|
|
|
def is_png(path: str) -> bool:
|
|
if path:
|
|
with suppress(Exception), open(path, 'rb') as f:
|
|
header = f.read(8)
|
|
return header.startswith(b'\211PNG\r\n\032\n')
|
|
return False
|
|
|
|
|
|
def cmdline_for_hold(cmd: Sequence[str] = (), opts: Optional['Options'] = None) -> list[str]:
|
|
if opts is None:
|
|
with suppress(RuntimeError):
|
|
opts = get_options()
|
|
if opts is None:
|
|
from .options.types import defaults
|
|
opts = defaults
|
|
ksi = ' '.join(opts.shell_integration)
|
|
import shlex
|
|
shell = shlex.join(resolved_shell(opts))
|
|
return [kitten_exe(), 'run-shell', f'--shell={shell}', f'--shell-integration={ksi}', '--env=KITTY_HOLD=1'] + list(cmd)
|
|
|
|
|
|
def safe_mtime(path: str) -> float | None:
|
|
with suppress(OSError):
|
|
return os.path.getmtime(path)
|
|
return None
|
|
|
|
|
|
@run_once
|
|
def get_custom_window_icon() -> tuple[float, str] | tuple[None, None]:
|
|
filenames = ['kitty.app.png']
|
|
if is_macos:
|
|
# On macOS, prefer icns to png.
|
|
filenames.insert(0, 'kitty.app.icns')
|
|
for name in filenames:
|
|
custom_icon_path = os.path.join(config_dir, name)
|
|
custom_icon_mtime = safe_mtime(custom_icon_path)
|
|
if custom_icon_mtime is not None:
|
|
return custom_icon_mtime, custom_icon_path
|
|
return None, None
|
|
|
|
|
|
def key_val_matcher(items: Iterable[tuple[str, str]], key_pat: 're.Pattern[str]', val_pat: Optional['re.Pattern[str]']) -> bool:
|
|
for key, val in items:
|
|
if key_pat.search(key) is not None and (
|
|
val_pat is None or val_pat.search(val) is not None):
|
|
return True
|
|
return False
|
|
|
|
|
|
def shlex_split(text: str, allow_ansi_quoted_strings: bool = False) -> Iterator[str]:
|
|
yield from Shlex(text, allow_ansi_quoted_strings)
|
|
|
|
|
|
def shlex_split_with_positions(text: str, allow_ansi_quoted_strings: bool = False) -> Iterator[tuple[int, str]]:
|
|
s = Shlex(text, allow_ansi_quoted_strings)
|
|
while (q := s.next_word())[0] > -1:
|
|
yield q
|
|
|
|
|
|
def timed_debug_print(*a: Any, sep: str = ' ', end: str = '\n') -> None:
|
|
_timed_debug_print(sep.join(map(str, a)) + end)
|
|
|
|
|
|
def lock_file(f: BinaryIO) -> None:
|
|
if not f.writable():
|
|
raise ValueError('Cannot lock files not opened in writable mode')
|
|
fcntl.lockf(f, fcntl.LOCK_EX)
|
|
|
|
|
|
def unlock_file(f: BinaryIO) -> None:
|
|
if not f.writable():
|
|
raise ValueError('Cannot unlock files not opened in writable mode')
|
|
fcntl.lockf(f, fcntl.LOCK_UN)
|