Files
kitty-mirror/kitty/utils.py
Kovid Goyal d52f2e7981 Rewrite rendering pipeline
This was needed to fix various corner cases when doing blending of colors
in linear space. The new architecture has the same performance as the
old in the common case of opaque rendering with no UI layers or images.

In the case of only positive z-index images there is a performance
decrease as the OS Window is now rendered to a offscreen texture and
then blitted to screen. However, in the future when we move to Vulkan or
I can figure out how to get Wayland to accept buffers with colors in
linear space, this performance penalty can be removed. The performance
penalty was not significant on my system but this is highly GPU
dependent. Modern GPUs are supposedly optimised for rendering to
offscreen buffers, so we will see. The awrit project might be a good
test case.

Now either we have 1-shot rendering for the case of opaque with only ext
or all the various pieces are rendered in successive draw calls into an
offscreen buffer that is blitted to the output buffer after all drawing
is done.

Fixes #8869
2025-08-11 00:47:02 +05:30

1131 lines
34 KiB
Python

#!/usr/bin/env python
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
import fcntl
import math
import os
import re
import string
import sys
from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence
from contextlib import contextmanager, suppress
from functools import lru_cache
from re import Match, Pattern
from typing import (
TYPE_CHECKING,
Any,
BinaryIO,
NamedTuple,
Optional,
cast,
)
from .constants import (
clear_handled_signals,
config_dir,
is_macos,
is_wayland,
kitten_exe,
runtime_dir,
shell_path,
ssh_control_master_template,
)
from .fast_data_types import WINDOW_FULLSCREEN, WINDOW_HIDDEN, WINDOW_MAXIMIZED, WINDOW_MINIMIZED, WINDOW_NORMAL, Color, Shlex, get_options, monotonic, open_tty
from .fast_data_types import timed_debug_print as _timed_debug_print
from .types import run_once
from .typing_compat import AddressFamily, PopenType, StartupCtx
if TYPE_CHECKING:
import tarfile
from .fast_data_types import OSWindowSize
from .options.types import Options
else:
Options = object
class Flag:
def __init__(self, initial_val: bool = True) -> None:
self.val = initial_val
def __enter__(self) -> None:
self.val ^= True
def __exit__(self, *a: object) -> None:
self.val ^= True
def __bool__(self) -> bool:
return self.val
disallow_expand_vars = Flag(False)
def expandvars(val: str, env: Mapping[str, str] = {}, fallback_to_os_env: bool = True) -> str:
'''
Expand $VAR and ${VAR} Use $$ for a literal $
'''
def sub(m: 'Match[str]') -> str:
key = m.group(1) or m.group(2)
result = env.get(key)
if result is None and fallback_to_os_env:
result = os.environ.get(key)
if result is None:
result = m.group()
return result
if disallow_expand_vars or '$' not in val:
return val
return re.sub(r'\$(?:(\w+)|\{([^}]+)\})', sub, val.replace('$$', '\0')).replace('\0', '$')
@lru_cache(maxsize=2)
def sgr_sanitizer_pat(for_splitting: bool = False) -> 're.Pattern[str]':
pat = '\033\\[.*?m'
if for_splitting:
return re.compile(f'({pat})')
return re.compile(pat)
@run_once
def kitty_ansi_sanitizer_pat() -> 're.Pattern[str]':
# removes ANSI sequences generated by kitty's ANSI output routines. Not
# suitable for stripping general ANSI sequences
return re.compile(r'\x1b(?:\[[0-9;:]*?m|\].*?\x1b\\)')
def platform_window_id(os_window_id: int) -> int | None:
if is_macos:
from .fast_data_types import cocoa_window_id
with suppress(Exception):
return cocoa_window_id(os_window_id)
if not is_wayland():
from .fast_data_types import x11_window_id
with suppress(Exception):
return x11_window_id(os_window_id)
return None
def safe_print(*a: Any, **k: Any) -> None:
with suppress(Exception):
print(*a, **k)
def log_error(*a: Any, **k: str) -> None:
from .fast_data_types import log_error_string
output = getattr(log_error, 'redirect', log_error_string)
with suppress(Exception):
msg = k.get('sep', ' ').join(map(str, a)) + k.get('end', '')
output(msg)
@contextmanager
def suppress_error_logging() -> Iterator[None]:
before = getattr(log_error, 'redirect', suppress_error_logging)
setattr(log_error, 'redirect', lambda *a: None)
try:
yield
finally:
if before is suppress_error_logging:
delattr(log_error, 'redirect')
else:
setattr(log_error, 'redirect', before)
def ceil_int(x: float) -> int:
return int(math.ceil(x))
def sanitize_title(x: str) -> str:
return re.sub(r'\s+', ' ', re.sub(r'[\0-\x19\x80-\x9f]', '', x))
def color_as_int(val: Color) -> int:
return int(val) & 0xffffff
def color_from_int(val: int) -> Color:
return Color((val >> 16) & 0xFF, (val >> 8) & 0xFF, val & 0xFF)
class ScreenSize(NamedTuple):
rows: int
cols: int
width: int
height: int
cell_width: int
cell_height: int
def read_screen_size(fd: int = -1) -> ScreenSize:
import array
import fcntl
import termios
buf = array.array('H', [0, 0, 0, 0])
if fd < 0:
fd = sys.stdout.fileno()
fcntl.ioctl(fd, termios.TIOCGWINSZ, cast(bytearray, buf))
rows, cols, width, height = tuple(buf)
cell_width, cell_height = width // (cols or 1), height // (rows or 1)
return ScreenSize(rows, cols, width, height, cell_width, cell_height)
class ScreenSizeGetter:
changed = True
Size = ScreenSize
ans: ScreenSize | None = None
def __init__(self, fd: int | None):
if fd is None:
fd = sys.stdout.fileno()
self.fd = fd
def __call__(self) -> ScreenSize:
if self.changed:
self.ans = read_screen_size(self.fd)
self.changed = False
return cast(ScreenSize, self.ans)
@lru_cache(maxsize=64, typed=True)
def screen_size_function(fd: int | None = None) -> ScreenSizeGetter:
return ScreenSizeGetter(fd)
def fit_image(width: int, height: int, pwidth: int, pheight: int) -> tuple[int, int]:
from math import floor
if height > pheight:
corrf = pheight / float(height)
width, height = floor(corrf * width), pheight
if width > pwidth:
corrf = pwidth / float(width)
width, height = pwidth, floor(corrf * height)
if height > pheight:
corrf = pheight / float(height)
width, height = floor(corrf * width), pheight
return int(width), int(height)
def base64_encode(
integer: int,
chars: str = string.ascii_uppercase + string.ascii_lowercase + string.digits +
'+/'
) -> str:
ans = ''
while True:
integer, remainder = divmod(integer, 64)
ans = chars[remainder] + ans
if integer == 0:
break
return ans
def command_for_open(program: str | list[str] = 'default') -> list[str]:
if isinstance(program, str):
from .conf.utils import to_cmdline
program = to_cmdline(program)
if program == ['default']:
cmd = ['open'] if is_macos else ['xdg-open']
else:
cmd = program
return cmd
def open_cmd(cmd: Iterable[str] | list[str], arg: None | Iterable[str] | str = None,
cwd: str | None = None, extra_env: dict[str, str] | None = None) -> 'PopenType[bytes]':
import subprocess
if arg is not None:
cmd = list(cmd)
if isinstance(arg, str):
cmd.append(arg)
else:
cmd.extend(arg)
env: dict[str, str] | None = None
if extra_env:
env = os.environ.copy()
env.update(extra_env)
return subprocess.Popen(
tuple(cmd), stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, cwd=cwd or None,
preexec_fn=clear_handled_signals, env=env)
def open_url(url: str, program: str | list[str] = 'default', cwd: str | None = None, extra_env: dict[str, str] | None = None) -> 'PopenType[bytes]':
return open_cmd(command_for_open(program), url, cwd=cwd, extra_env=extra_env)
def init_startup_notification_x11(window_handle: int, startup_id: str | None = None) -> Optional['StartupCtx']:
# https://specifications.freedesktop.org/startup-notification-spec/startup-notification-latest.txt
from kitty.fast_data_types import init_x11_startup_notification
sid = startup_id or os.environ.pop('DESKTOP_STARTUP_ID', None) # ensure child processes don't get this env var
if not sid:
return None
from .fast_data_types import x11_display
display = x11_display()
if not display:
return None
return init_x11_startup_notification(display, window_handle, sid)
def end_startup_notification_x11(ctx: 'StartupCtx') -> None:
from kitty.fast_data_types import end_x11_startup_notification
end_x11_startup_notification(ctx)
def init_startup_notification(window_handle: int | None, startup_id: str | None = None) -> Optional['StartupCtx']:
if is_macos or is_wayland():
return None
if window_handle is None:
log_error('Could not perform startup notification as window handle not present')
return None
try:
try:
return init_startup_notification_x11(window_handle, startup_id)
except OSError as e:
if not str(e).startswith("Failed to load libstartup-notification"):
raise e
log_error(
f'{e}. This has two main effects:',
'There will be no startup feedback and when using --single-instance, kitty windows may start on an incorrect desktop/workspace.')
except Exception:
import traceback
traceback.print_exc()
return None
def end_startup_notification(ctx: Optional['StartupCtx']) -> None:
if not ctx:
return
if is_macos or is_wayland():
return
try:
end_startup_notification_x11(ctx)
except Exception:
import traceback
traceback.print_exc()
class startup_notification_handler:
# WARNING: This only works on X11 on other platforms extra_callback will be called
# after the window is shown, not before, as they do not do two stage window
# creation.
def __init__(self, do_notify: bool = True, startup_id: str | None = None, extra_callback: Callable[[int], None] | None = None):
self.do_notify = do_notify
self.startup_id = startup_id
self.extra_callback = extra_callback
self.ctx: Optional['StartupCtx'] = None
def __enter__(self) -> Callable[[int], None]:
def pre_show_callback(window_handle: int) -> None:
if self.extra_callback is not None:
self.extra_callback(window_handle)
if self.do_notify:
self.ctx = init_startup_notification(window_handle, self.startup_id)
return pre_show_callback
def __exit__(self, *a: Any) -> None:
if self.ctx is not None:
end_startup_notification(self.ctx)
def unix_socket_directories() -> Iterator[str]:
import tempfile
home = os.path.expanduser('~')
candidates = [tempfile.gettempdir(), home]
if is_macos:
from .fast_data_types import user_cache_dir
candidates = [user_cache_dir(), '/Library/Caches']
else:
if os.environ.get('XDG_RUNTIME_DIR'):
candidates.insert(0, os.environ['XDG_RUNTIME_DIR'])
for loc in candidates:
if os.access(loc, os.W_OK | os.R_OK | os.X_OK):
yield loc
def unix_socket_paths(name: str, ext: str = '.lock') -> Generator[str, None, None]:
home = os.path.expanduser('~')
for loc in unix_socket_directories():
filename = ('.' if loc == home else '') + name + ext
yield os.path.join(loc, filename)
def parse_address_spec(spec: str) -> tuple[AddressFamily, tuple[str, int] | str, str | None]:
import socket
try:
protocol, rest = spec.split(':', 1)
except ValueError:
raise ValueError(f'Invalid listen-on value: {spec} must be of the form protocol:address')
socket_path = None
address: str | tuple[str, int] = ''
if protocol == 'unix':
family = socket.AF_UNIX
address = rest
if address.startswith('@') and len(address) > 1:
address = '\0' + address[1:]
else:
socket_path = address
elif protocol in ('tcp', 'tcp6'):
family = socket.AF_INET if protocol == 'tcp' else socket.AF_INET6
if rest.startswith('['): # ]
host = rest[1:]
host, sep, leftover = host.rpartition(']')
_, port = leftover.rsplit(':', 1)
if ':' in host and protocol == 'tcp':
family = socket.AF_INET6
else:
host, port = rest.rsplit(':', 1)
address = host, int(port)
else:
raise ValueError(f'Unknown protocol in listen-on value: {spec}')
return family, address, socket_path
def parse_os_window_state(state: str) -> int:
match state:
case 'normal':
return WINDOW_NORMAL
case 'maximized':
return WINDOW_MAXIMIZED
case 'minimized':
return WINDOW_MINIMIZED
case 'fullscreen' | 'fullscreened':
return WINDOW_FULLSCREEN
case 'hidden':
return WINDOW_HIDDEN
case _:
return WINDOW_NORMAL
def write_all(fd: int, data: str | bytes, block_until_written: bool = True) -> None:
if isinstance(data, str):
data = data.encode('utf-8')
mvd = memoryview(data)
while len(mvd) > 0:
try:
n = os.write(fd, mvd)
except BlockingIOError:
if not block_until_written:
raise
continue
if not n:
break
mvd = mvd[n:]
class TTYIO:
def __init__(self, read_with_timeout: bool = True):
self.read_with_timeout = read_with_timeout
def __enter__(self) -> 'TTYIO':
self.tty_fd, self.original_termios = open_tty(self.read_with_timeout)
return self
def __exit__(self, *a: Any) -> None:
from .fast_data_types import close_tty
close_tty(self.tty_fd, self.original_termios)
def wait_till_read_available(self) -> bool:
if self.read_with_timeout:
raise ValueError('Cannot wait when TTY is set to read with timeout')
import select
rd = select.select([self.tty_fd], [], [])[0]
return bool(rd)
def read(self, limit: int) -> bytes:
return os.read(self.tty_fd, limit)
def send(self, data: str | bytes | Iterable[str | bytes]) -> None:
if isinstance(data, (str, bytes)):
write_all(self.tty_fd, data)
else:
for chunk in data:
write_all(self.tty_fd, chunk)
def recv(self, more_needed: Callable[[bytes], bool], timeout: float, sz: int = 1) -> None:
fd = self.tty_fd
start_time = monotonic()
while timeout > monotonic() - start_time:
# will block for 0.1 secs waiting for data because we have set
# VMIN=0 VTIME=1 in termios
data = os.read(fd, sz)
if data and not more_needed(data):
break
def set_echo(fd: int = -1, on: bool = False) -> tuple[int, list[int | list[bytes | int]]]:
import termios
if fd < 0:
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
new = termios.tcgetattr(fd)
if on:
new[3] |= termios.ECHO
else:
new[3] &= ~termios.ECHO
termios.tcsetattr(fd, termios.TCSADRAIN, new)
return fd, old
@contextmanager
def no_echo(fd: int = -1) -> Iterator[None]:
import termios
fd, old = set_echo(fd)
try:
yield
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
def natsort_ints(iterable: Iterable[str]) -> list[str]:
def convert(text: str) -> int | str:
return int(text) if text.isdigit() else text
def alphanum_key(key: str) -> tuple[int | str, ...]:
return tuple(map(convert, re.split(r'(\d+)', key)))
return sorted(iterable, key=alphanum_key)
def get_hostname(fallback: str = '') -> str:
import socket
try:
return socket.gethostname() or fallback
except Exception:
return fallback
def resolve_editor_cmd(editor: str, shell_env: Mapping[str, str]) -> str | None:
import shlex
editor_cmd = list(shlex_split(editor))
editor_exe = (editor_cmd or ('',))[0]
if editor_exe and os.path.isabs(editor_exe):
return editor
if not editor_exe:
return None
def patched(exe: str) -> str:
editor_cmd[0] = exe
return ' '.join(map(shlex.quote, editor_cmd))
if shell_env is os.environ:
q = which(editor_exe, only_system=True)
if q:
return patched(q)
elif 'PATH' in shell_env:
import shutil
q = shutil.which(editor_exe, path=shell_env['PATH'])
if q:
return patched(q)
return None
def get_editor_from_env(env: Mapping[str, str]) -> str | None:
for var in ('VISUAL', 'EDITOR'):
editor = env.get(var)
if editor:
editor = resolve_editor_cmd(editor, env)
if editor:
return editor
return None
def get_editor_from_env_vars(opts: Options | None = None) -> list[str]:
editor = get_editor_from_env(os.environ)
if not editor:
shell_env = read_shell_environment(opts)
editor = get_editor_from_env(shell_env)
for ans in (editor, 'vim', 'nvim', 'vi', 'emacs', 'hx', 'kak', 'micro', 'nano', 'vis'):
if ans and which(next(shlex_split(ans)), only_system=True):
break
else:
ans = 'vim'
return list(shlex_split(ans))
def get_editor(opts: Options | None = None, path_to_edit: str = '', line_number: int = 0) -> list[str]:
if opts is None:
try:
opts = get_options()
except RuntimeError:
# we are in a kitten
from .cli import create_default_opts
opts = create_default_opts()
if opts.editor == '.':
ans = get_editor_from_env_vars()
else:
ans = list(shlex_split(opts.editor))
ans[0] = os.path.expanduser(ans[0])
if path_to_edit:
if line_number:
eq = os.path.basename(ans[0]).lower()
if eq in ('code', 'code.exe'):
path_to_edit += f':{line_number}'
ans.append('--goto')
else:
ans.append(f'+{line_number}')
ans.append(path_to_edit)
return ans
def is_path_in_temp_dir(path: str) -> bool:
if not path:
return False
def abspath(x: str | None) -> str:
if x:
x = os.path.abspath(os.path.realpath(x))
return x or ''
import tempfile
path = abspath(path)
candidates = frozenset(map(abspath, ('/tmp', '/dev/shm', os.environ.get('TMPDIR', None), tempfile.gettempdir())))
for q in candidates:
if q and path.startswith(q):
return True
return False
def is_ok_to_read_image_file(path: str, fd: int) -> bool:
import stat
path = os.path.abspath(os.path.realpath(path))
try:
path_stat = os.stat(path, follow_symlinks=True)
fd_stat = os.fstat(fd)
except OSError:
return False
if not os.path.samestat(path_stat, fd_stat):
return False
parts = path.split(os.sep)[1:]
if len(parts) < 1:
return False
if parts[0] in ('sys', 'proc', 'dev'):
if parts[0] == 'dev':
return len(parts) > 2 and parts[1] == 'shm'
return False
return stat.S_ISREG(fd_stat.st_mode)
def resolve_abs_or_config_path(path: str, env: Mapping[str, str] | None = None, conf_dir: str | None = None) -> str:
path = os.path.expanduser(path)
path = expandvars(path, env or {})
if not os.path.isabs(path):
path = os.path.join(conf_dir or config_dir, path)
return path
def resolve_custom_file(path: str) -> str:
opts: Options | None = None
with suppress(RuntimeError):
opts = get_options()
return resolve_abs_or_config_path(path, opts.env if opts else {})
def func_name(f: Any) -> str:
if hasattr(f, '__name__'):
return str(f.__name__)
if hasattr(f, 'func') and hasattr(f.func, '__name__'):
return str(f.func.__name__)
return str(f)
def resolved_shell(opts: Options | None = None) -> list[str]:
q: str = getattr(opts, 'shell', '.')
if q == '.':
ans = [shell_path]
else:
env = {}
if opts is not None:
env['TERM'] = opts.term
if 'SHELL' not in os.environ:
env['SHELL'] = shell_path
if 'HOME' not in os.environ:
env['HOME'] = os.path.expanduser('~')
if 'USER' not in os.environ:
import pwd
env['USER'] = pwd.getpwuid(os.geteuid()).pw_name
def expand(x: str) -> str:
return expandvars(x, env)
ans = list(map(expand, shlex_split(q)))
return ans
@run_once
def system_paths_on_macos() -> tuple[str, ...]:
entries, seen = [], set()
def add_from_file(x: str) -> None:
try:
f = open(x)
except (FileNotFoundError, PermissionError):
return
with f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and line not in seen:
if os.path.isdir(line):
seen.add(line)
entries.append(line)
try:
files = os.listdir('/etc/paths.d')
except (FileNotFoundError, PermissionError):
files = []
for name in sorted(files):
add_from_file(os.path.join('/etc/paths.d', name))
add_from_file('/etc/paths')
return tuple(entries)
def which(name: str, only_system: bool = False) -> str | None:
if os.sep in name:
return name
import shutil
opts: Options | None = None
with suppress(RuntimeError):
opts = get_options()
tried_paths = set()
paths = []
append_paths = []
if opts and opts.exe_search_path:
for x in opts.exe_search_path:
x = x.strip()
if x:
if x[0] == '-':
tried_paths.add(os.path.expanduser(x[1:]))
elif x[0] == '+':
append_paths.append(os.path.expanduser(x[1:]))
else:
paths.append(os.path.expanduser(x))
ep = os.environ.get('PATH')
if ep:
paths.extend(ep.split(os.pathsep))
paths.append(os.path.expanduser('~/.local/bin'))
paths.append(os.path.expanduser('~/bin'))
paths.extend(append_paths)
ans = shutil.which(name, path=os.pathsep.join(x for x in paths if x not in tried_paths))
if ans:
return ans
# In case PATH is messed up try a default set of paths
if is_macos:
system_paths = system_paths_on_macos()
else:
system_paths = ('/usr/local/bin', '/opt/bin', '/usr/bin', '/bin', '/usr/sbin', '/sbin')
tried_paths |= set(paths)
system_paths = tuple(x for x in system_paths if x not in tried_paths)
if system_paths:
ans = shutil.which(name, path=os.pathsep.join(system_paths))
if ans:
return ans
tried_paths |= set(system_paths)
if only_system or opts is None:
return None
shell_env = read_shell_environment(opts)
for xenv in (shell_env, opts.env):
q = xenv.get('PATH')
if q:
paths = [x for x in xenv['PATH'].split(os.pathsep) if x not in tried_paths]
ans = shutil.which(name, path=os.pathsep.join(paths))
if ans:
return ans
tried_paths |= set(paths)
return None
def read_shell_environment(opts: Options | None = None) -> dict[str, str]:
ans: dict[str, str] | None = getattr(read_shell_environment, 'ans', None)
if ans is None:
from .child import openpty
ans = {}
setattr(read_shell_environment, 'ans', ans)
import subprocess
shell = resolved_shell(opts)
master, slave = openpty()
os.set_blocking(master, False)
if '-l' not in shell and '--login' not in shell:
shell += ['-l']
if '-i' not in shell and '--interactive' not in shell:
shell += ['-i']
try:
p = subprocess.Popen(
shell + ['-c', 'env'], stdout=slave, stdin=slave, stderr=slave, start_new_session=True, close_fds=True,
preexec_fn=clear_handled_signals)
except FileNotFoundError:
log_error('Could not find shell to read environment')
return ans
with os.fdopen(master, 'rb') as stdout, os.fdopen(slave, 'wb'):
raw = b''
from time import monotonic
start_time = monotonic()
while monotonic() - start_time < 1.5:
try:
ret: int | None = p.wait(0.01)
except subprocess.TimeoutExpired:
ret = None
with suppress(Exception):
raw += stdout.read()
if ret is not None:
break
if cast(Optional[int], p.returncode) is None:
log_error('Timed out waiting for shell to quit while reading shell environment')
p.kill()
elif p.returncode == 0:
while True:
try:
x = stdout.read()
except Exception:
break
if not x:
break
raw += x
draw = raw.decode('utf-8', 'replace')
for line in draw.splitlines():
k, v = line.partition('=')[::2]
if k and v:
ans[k] = v
else:
log_error('Failed to run shell to read its environment')
return ans
def parse_uri_list(text: str) -> Generator[str, None, None]:
' Get paths from file:// URLs '
from urllib.parse import unquote, urlparse
for line in text.splitlines():
if not line or line.startswith('#'):
continue
if not line.startswith('file://'):
yield line
continue
try:
purl = urlparse(line, allow_fragments=False)
except Exception:
yield line
continue
if purl.path:
yield unquote(purl.path)
def edit_config_file() -> None:
from kitty.config import prepare_config_file_for_editing
p = prepare_config_file_for_editing()
editor = get_editor()
os.execvp(editor[0], editor + [p])
class SSHConnectionData(NamedTuple):
binary: str
hostname: str
port: int | None = None
identity_file: str = ''
extra_args: tuple[tuple[str, str], ...] = ()
def get_new_os_window_size(
metrics: 'OSWindowSize', width: int, height: int, unit: str, incremental: bool = False, has_window_scaling: bool = True
) -> tuple[int, int]:
if unit == 'cells':
cw = metrics['cell_width']
ch = metrics['cell_height']
width *= cw
height *= ch
if has_window_scaling:
width = round(width / metrics['xscale'])
height = round(height / metrics['yscale'])
if incremental:
w = metrics['width'] + width
h = metrics['height'] + height
else:
w = width or metrics['width']
h = height or metrics['height']
return w, h
def get_all_processes() -> Iterable[int]:
if is_macos:
from kitty.fast_data_types import get_all_processes as f
yield from f()
else:
for c in os.listdir('/proc'):
if c.isdigit():
yield int(c)
def is_kitty_gui_cmdline(*cmd: str) -> bool:
if not cmd:
return False
if os.path.basename(cmd[0]) != 'kitty':
return False
if len(cmd) == 1:
return True
s = cmd[1][:1]
if s == '@':
return False
if s == '+':
if cmd[1] == '+':
return len(cmd) > 2 and cmd[2] == 'open'
return cmd[1] == '+open'
return True
def reload_conf_in_all_kitties() -> None:
import signal
from kitty.child import cmdline_of_pid
for pid in get_all_processes():
try:
cmd = cmdline_of_pid(pid)
except Exception:
continue
if cmd and is_kitty_gui_cmdline(*cmd):
os.kill(pid, signal.SIGUSR1)
@run_once
def control_codes_pat() -> 'Pattern[str]':
return re.compile('[\x00-\x09\x0b-\x1f\x7f-\x9f]')
def sanitize_control_codes(text: str, replace_with: str = '') -> str:
return control_codes_pat().sub(replace_with, text)
def hold_till_enter() -> None:
import subprocess
from .constants import kitten_exe
subprocess.Popen([kitten_exe(), '__hold_till_enter__']).wait()
def cleanup_ssh_control_masters() -> None:
import glob
import subprocess
try:
files = frozenset(glob.glob(os.path.join(runtime_dir(), ssh_control_master_template.format(
kitty_pid=os.getpid(), ssh_placeholder='*'))))
except OSError:
return
workers = tuple(subprocess.Popen([
'ssh', '-o', f'ControlPath={x}', '-O', 'exit', 'kitty-unused-host-name'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
preexec_fn=clear_handled_signals) for x in files)
for w in workers:
w.wait()
for x in files:
with suppress(OSError):
os.remove(x)
def path_from_osc7_url(url: str | bytes) -> str:
if isinstance(url, bytes):
url = url.decode('utf-8')
if url.startswith('kitty-shell-cwd://'):
return '/' + url.split('/', 3)[-1]
if url.startswith('file://'):
from urllib.parse import unquote, urlparse
return unquote(urlparse(url).path)
return ''
@run_once
def macos_version() -> tuple[int, ...]:
# platform.mac_ver does not work thanks to Apple's stupid "hardening", so just use sw_vers
import subprocess
try:
o = subprocess.check_output(['sw_vers', '-productVersion'], stderr=subprocess.STDOUT).decode()
except Exception:
return 0, 0, 0
return tuple(map(int, o.strip().split('.')))
@lru_cache(maxsize=2)
def less_version(less_exe: str = 'less') -> int:
import subprocess
o = subprocess.check_output([less_exe, '-V'], stderr=subprocess.STDOUT).decode()
m = re.match(r'less (\d+)', o)
if m is None:
raise ValueError(f'Invalid version string for less: {o}')
return int(m.group(1))
def is_pid_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except Exception:
pass
return True
def safer_fork() -> int:
pid = os.fork()
if pid:
# master
import ssl
ssl.RAND_add(os.urandom(32), 0.0)
else:
# child
import atexit
atexit._clear()
return pid
def docs_url(which: str = '', local_docs_root: str | None = '') -> str:
from urllib.parse import quote
from .conf.types import resolve_ref
from .constants import local_docs, website_url
if local_docs_root is None:
ld = ''
else:
ld = local_docs_root or local_docs()
base, frag = which.partition('#')[::2]
base = base.strip('/')
if frag.startswith('ref='):
ref = frag[4:]
which = resolve_ref(ref, lambda x: x)
if which.startswith('https://') or which.startswith('http://'):
return which
base, frag = which.partition('#')[::2]
base = base.strip('/')
if ld:
base = base or 'index'
url = f'file://{ld}/' + quote(base) + '.html'
else:
url = website_url(base)
if frag:
url += '#' + frag
return url
def sanitize_for_bracketed_paste(text: bytes) -> bytes:
pat = re.compile(b'(?:(?:\033\\\x5b)|(?:\x9b))201~')
while True:
new_text = pat.sub(b'', text)
if new_text == text:
break
text = new_text
return text
@lru_cache(maxsize=64)
def sanitize_url_for_display_to_user(url: str) -> str:
from urllib.parse import unquote, urlparse, urlunparse
try:
purl = urlparse(url)
if purl.netloc:
purl = purl._replace(netloc=purl.netloc.encode('idna').decode('ascii'))
if purl.path:
purl = purl._replace(path=unquote(purl.path))
url = urlunparse(purl)
except Exception as e:
log_error(e)
url = 'Unparseable URL: ' + url
return url
def extract_all_from_tarfile_safely(tf: 'tarfile.TarFile', dest: str) -> None:
# Ensure that all extracted items are within dest
def is_within_directory(directory: str, target: str) -> bool:
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)
prefix = os.path.commonprefix((abs_directory, abs_target))
return prefix == abs_directory
def safe_extract(tar: 'tarfile.TarFile', path: str = ".", numeric_owner: bool = False) -> None:
for member in tar.getmembers():
member_path = os.path.join(path, member.name)
if not is_within_directory(path, member_path):
raise ValueError(f'Attempted path traversal in tar file: {member.name}')
tar.extractall(path, tar.getmembers(), numeric_owner=numeric_owner)
safe_extract(tf, dest)
def is_png(path: str) -> bool:
if path:
with suppress(Exception), open(path, 'rb') as f:
header = f.read(8)
return header.startswith(b'\211PNG\r\n\032\n')
return False
def cmdline_for_hold(cmd: Sequence[str] = (), opts: Optional['Options'] = None) -> list[str]:
if opts is None:
with suppress(RuntimeError):
opts = get_options()
if opts is None:
from .options.types import defaults
opts = defaults
ksi = ' '.join(opts.shell_integration)
import shlex
shell = shlex.join(resolved_shell(opts))
return [kitten_exe(), 'run-shell', f'--shell={shell}', f'--shell-integration={ksi}', '--env=KITTY_HOLD=1'] + list(cmd)
def safe_mtime(path: str) -> float | None:
with suppress(OSError):
return os.path.getmtime(path)
return None
@run_once
def get_custom_window_icon() -> tuple[float, str] | tuple[None, None]:
filenames = ['kitty.app.png']
if is_macos:
# On macOS, prefer icns to png.
filenames.insert(0, 'kitty.app.icns')
for name in filenames:
custom_icon_path = os.path.join(config_dir, name)
custom_icon_mtime = safe_mtime(custom_icon_path)
if custom_icon_mtime is not None:
return custom_icon_mtime, custom_icon_path
return None, None
def key_val_matcher(items: Iterable[tuple[str, str]], key_pat: 're.Pattern[str]', val_pat: Optional['re.Pattern[str]']) -> bool:
for key, val in items:
if key_pat.search(key) is not None and (
val_pat is None or val_pat.search(val) is not None):
return True
return False
def shlex_split(text: str, allow_ansi_quoted_strings: bool = False) -> Iterator[str]:
yield from Shlex(text, allow_ansi_quoted_strings)
def shlex_split_with_positions(text: str, allow_ansi_quoted_strings: bool = False) -> Iterator[tuple[int, str]]:
s = Shlex(text, allow_ansi_quoted_strings)
while (q := s.next_word())[0] > -1:
yield q
def timed_debug_print(*a: Any, sep: str = ' ', end: str = '\n') -> None:
_timed_debug_print(sep.join(map(str, a)) + end)
def lock_file(f: BinaryIO) -> None:
if not f.writable():
raise ValueError('Cannot lock files not opened in writable mode')
fcntl.lockf(f, fcntl.LOCK_EX)
def unlock_file(f: BinaryIO) -> None:
if not f.writable():
raise ValueError('Cannot unlock files not opened in writable mode')
fcntl.lockf(f, fcntl.LOCK_UN)