#!/usr/bin/env python # License: GPL v3 Copyright: 2016, Kovid Goyal import fcntl import math import os import re import string import sys from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence from contextlib import contextmanager, suppress from functools import lru_cache from re import Match, Pattern from types import MappingProxyType from typing import ( TYPE_CHECKING, Any, BinaryIO, NamedTuple, NoReturn, Optional, cast, ) from .constants import ( clear_handled_signals, config_dir, is_macos, is_wayland, kitten_exe, runtime_dir, shell_path, ssh_control_master_template, ) from .fast_data_types import WINDOW_FULLSCREEN, WINDOW_HIDDEN, WINDOW_MAXIMIZED, WINDOW_MINIMIZED, WINDOW_NORMAL, Color, Shlex, get_options, monotonic, open_tty from .fast_data_types import timed_debug_print as _timed_debug_print from .types import run_once from .typing_compat import AddressFamily, PopenType, StartupCtx if TYPE_CHECKING: import tarfile from .fast_data_types import OSWindowSize from .options.types import Options else: Options = object class Flag: def __init__(self, initial_val: bool = True) -> None: self.val = initial_val def __enter__(self) -> None: self.val ^= True def __exit__(self, *a: object) -> None: self.val ^= True def __bool__(self) -> bool: return self.val disallow_expand_vars = Flag(False) def expandvars(val: str, env: Mapping[str, str] = {}, fallback_to_os_env: bool = True) -> str: ''' Expand $VAR and ${VAR} Use $$ for a literal $ ''' def sub(m: 'Match[str]') -> str: key = m.group(1) or m.group(2) result = env.get(key) if result is None and fallback_to_os_env: result = os.environ.get(key) if result is None: result = m.group() return result if disallow_expand_vars or '$' not in val: return val return re.sub(r'\$(?:(\w+)|\{([^}]+)\})', sub, val.replace('$$', '\0')).replace('\0', '$') @lru_cache(maxsize=2) def sgr_sanitizer_pat(for_splitting: bool = False) -> 're.Pattern[str]': pat = '\033\\[.*?m' if for_splitting: return re.compile(f'({pat})') return re.compile(pat) @run_once def kitty_ansi_sanitizer_pat() -> 're.Pattern[str]': # removes ANSI sequences generated by kitty's ANSI output routines. Not # suitable for stripping general ANSI sequences return re.compile(r'\x1b(?:\[[0-9;:]*?m|\].*?\x1b\\)') def platform_window_id(os_window_id: int) -> int | None: if is_macos: from .fast_data_types import cocoa_window_id with suppress(Exception): return cocoa_window_id(os_window_id) if not is_wayland(): from .fast_data_types import x11_window_id with suppress(Exception): return x11_window_id(os_window_id) return None def safe_print(*a: Any, **k: Any) -> None: with suppress(Exception): print(*a, **k) def log_error(*a: Any, **k: str) -> None: from .fast_data_types import log_error_string output = getattr(log_error, 'redirect', log_error_string) with suppress(Exception): msg = k.get('sep', ' ').join(map(str, a)) + k.get('end', '') output(msg) @contextmanager def suppress_error_logging() -> Iterator[None]: before = getattr(log_error, 'redirect', suppress_error_logging) setattr(log_error, 'redirect', lambda *a: None) try: yield finally: if before is suppress_error_logging: delattr(log_error, 'redirect') else: setattr(log_error, 'redirect', before) def ceil_int(x: float) -> int: return int(math.ceil(x)) def sanitize_title(x: str) -> str: return re.sub(r'\s+', ' ', re.sub(r'[\0-\x19\x80-\x9f]', '', x)) def color_as_int(val: Color) -> int: return int(val) & 0xffffff def color_from_int(val: int) -> Color: return Color((val >> 16) & 0xFF, (val >> 8) & 0xFF, val & 0xFF) class ScreenSize(NamedTuple): rows: int cols: int width: int height: int cell_width: int cell_height: int def read_screen_size(fd: int = -1) -> ScreenSize: import array import fcntl import termios buf = array.array('H', [0, 0, 0, 0]) if fd < 0: fd = sys.stdout.fileno() fcntl.ioctl(fd, termios.TIOCGWINSZ, cast(bytearray, buf)) rows, cols, width, height = tuple(buf) cell_width, cell_height = width // (cols or 1), height // (rows or 1) return ScreenSize(rows, cols, width, height, cell_width, cell_height) class ScreenSizeGetter: changed = True Size = ScreenSize ans: ScreenSize | None = None def __init__(self, fd: int | None): if fd is None: fd = sys.stdout.fileno() self.fd = fd def __call__(self) -> ScreenSize: if self.changed: self.ans = read_screen_size(self.fd) self.changed = False return cast(ScreenSize, self.ans) @lru_cache(maxsize=64, typed=True) def screen_size_function(fd: int | None = None) -> ScreenSizeGetter: return ScreenSizeGetter(fd) def fit_image(width: int, height: int, pwidth: int, pheight: int) -> tuple[int, int]: from math import floor if height > pheight: corrf = pheight / float(height) width, height = floor(corrf * width), pheight if width > pwidth: corrf = pwidth / float(width) width, height = pwidth, floor(corrf * height) if height > pheight: corrf = pheight / float(height) width, height = floor(corrf * width), pheight return int(width), int(height) def base64_encode( integer: int, chars: str = string.ascii_uppercase + string.ascii_lowercase + string.digits + '+/' ) -> str: ans = '' while True: integer, remainder = divmod(integer, 64) ans = chars[remainder] + ans if integer == 0: break return ans def command_for_open(program: str | list[str] = 'default') -> list[str]: if isinstance(program, str): from .conf.utils import to_cmdline program = to_cmdline(program) if program == ['default']: cmd = ['open'] if is_macos else ['xdg-open'] else: cmd = program return cmd def open_cmd(cmd: Iterable[str] | list[str], arg: None | Iterable[str] | str = None, cwd: str | None = None, extra_env: dict[str, str] | None = None) -> 'PopenType[bytes]': import subprocess if arg is not None: cmd = list(cmd) if isinstance(arg, str): cmd.append(arg) else: cmd.extend(arg) env: dict[str, str] | None = None if extra_env: env = os.environ.copy() env.update(extra_env) return subprocess.Popen( tuple(cmd), stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, cwd=cwd or None, preexec_fn=clear_handled_signals, env=env) def open_url(url: str, program: str | list[str] = 'default', cwd: str | None = None, extra_env: dict[str, str] | None = None) -> 'PopenType[bytes]': return open_cmd(command_for_open(program), url, cwd=cwd, extra_env=extra_env) def init_startup_notification_x11(window_handle: int, startup_id: str | None = None) -> Optional['StartupCtx']: # https://specifications.freedesktop.org/startup-notification-spec/startup-notification-latest.txt from kitty.fast_data_types import init_x11_startup_notification sid = startup_id or os.environ.pop('DESKTOP_STARTUP_ID', None) # ensure child processes don't get this env var if not sid: return None from .fast_data_types import x11_display display = x11_display() if not display: return None return init_x11_startup_notification(display, window_handle, sid) def end_startup_notification_x11(ctx: 'StartupCtx') -> None: from kitty.fast_data_types import end_x11_startup_notification end_x11_startup_notification(ctx) def init_startup_notification(window_handle: int | None, startup_id: str | None = None) -> Optional['StartupCtx']: if is_macos or is_wayland(): return None if window_handle is None: log_error('Could not perform startup notification as window handle not present') return None try: try: return init_startup_notification_x11(window_handle, startup_id) except OSError as e: if not str(e).startswith("Failed to load libstartup-notification"): raise e log_error( f'{e}. This has two main effects:', 'There will be no startup feedback and when using --single-instance, kitty windows may start on an incorrect desktop/workspace.') except Exception: import traceback traceback.print_exc() return None def end_startup_notification(ctx: Optional['StartupCtx']) -> None: if not ctx: return if is_macos or is_wayland(): return try: end_startup_notification_x11(ctx) except Exception: import traceback traceback.print_exc() class startup_notification_handler: # WARNING: This only works on X11 on other platforms extra_callback will be called # after the window is shown, not before, as they do not do two stage window # creation. def __init__(self, do_notify: bool = True, startup_id: str | None = None, extra_callback: Callable[[int], None] | None = None): self.do_notify = do_notify self.startup_id = startup_id self.extra_callback = extra_callback self.ctx: Optional['StartupCtx'] = None def __enter__(self) -> Callable[[int], None]: def pre_show_callback(window_handle: int) -> None: if self.extra_callback is not None: self.extra_callback(window_handle) if self.do_notify: self.ctx = init_startup_notification(window_handle, self.startup_id) return pre_show_callback def __exit__(self, *a: Any) -> None: if self.ctx is not None: end_startup_notification(self.ctx) def unix_socket_directories() -> Iterator[str]: import tempfile home = os.path.expanduser('~') candidates = [tempfile.gettempdir(), home] if is_macos: from .fast_data_types import user_cache_dir candidates = [user_cache_dir(), '/Library/Caches'] else: if os.environ.get('XDG_RUNTIME_DIR'): candidates.insert(0, os.environ['XDG_RUNTIME_DIR']) for loc in candidates: if os.access(loc, os.W_OK | os.R_OK | os.X_OK): yield loc def unix_socket_paths(name: str, ext: str = '.lock') -> Generator[str, None, None]: home = os.path.expanduser('~') for loc in unix_socket_directories(): filename = ('.' if loc == home else '') + name + ext yield os.path.join(loc, filename) def parse_address_spec(spec: str) -> tuple[AddressFamily, tuple[str, int] | str, str | None]: import socket try: protocol, rest = spec.split(':', 1) except ValueError: raise ValueError(f'Invalid listen-on value: {spec} must be of the form protocol:address') socket_path = None address: str | tuple[str, int] = '' if protocol == 'unix': family = socket.AF_UNIX address = rest if address.startswith('@') and len(address) > 1: address = '\0' + address[1:] else: socket_path = address elif protocol in ('tcp', 'tcp6'): family = socket.AF_INET if protocol == 'tcp' else socket.AF_INET6 if rest.startswith('['): # ] host = rest[1:] host, sep, leftover = host.rpartition(']') _, port = leftover.rsplit(':', 1) if ':' in host and protocol == 'tcp': family = socket.AF_INET6 else: host, port = rest.rsplit(':', 1) address = host, int(port) else: raise ValueError(f'Unknown protocol in listen-on value: {spec}') return family, address, socket_path def parse_os_window_state(state: str) -> int: match state: case 'normal': return WINDOW_NORMAL case 'maximized': return WINDOW_MAXIMIZED case 'minimized': return WINDOW_MINIMIZED case 'fullscreen' | 'fullscreened': return WINDOW_FULLSCREEN case 'hidden': return WINDOW_HIDDEN case _: return WINDOW_NORMAL def write_all(fd: int, data: str | bytes, block_until_written: bool = True) -> None: if isinstance(data, str): data = data.encode('utf-8') mvd = memoryview(data) while len(mvd) > 0: try: n = os.write(fd, mvd) except BlockingIOError: if not block_until_written: raise continue if not n: break mvd = mvd[n:] class TTYIO: def __init__(self, read_with_timeout: bool = True): self.read_with_timeout = read_with_timeout def __enter__(self) -> 'TTYIO': self.tty_fd, self.original_termios = open_tty(self.read_with_timeout) return self def __exit__(self, *a: Any) -> None: from .fast_data_types import close_tty close_tty(self.tty_fd, self.original_termios) def wait_till_read_available(self) -> bool: if self.read_with_timeout: raise ValueError('Cannot wait when TTY is set to read with timeout') import select rd = select.select([self.tty_fd], [], [])[0] return bool(rd) def read(self, limit: int) -> bytes: return os.read(self.tty_fd, limit) def send(self, data: str | bytes | Iterable[str | bytes]) -> None: if isinstance(data, (str, bytes)): write_all(self.tty_fd, data) else: for chunk in data: write_all(self.tty_fd, chunk) def recv(self, more_needed: Callable[[bytes], bool], timeout: float, sz: int = 1) -> None: fd = self.tty_fd start_time = monotonic() while timeout > monotonic() - start_time: # will block for 0.1 secs waiting for data because we have set # VMIN=0 VTIME=1 in termios data = os.read(fd, sz) if data and not more_needed(data): break def set_echo(fd: int = -1, on: bool = False) -> tuple[int, list[int | list[bytes | int]]]: import termios if fd < 0: fd = sys.stdin.fileno() old = termios.tcgetattr(fd) new = termios.tcgetattr(fd) if on: new[3] |= termios.ECHO else: new[3] &= ~termios.ECHO termios.tcsetattr(fd, termios.TCSADRAIN, new) return fd, old @contextmanager def no_echo(fd: int = -1) -> Iterator[None]: import termios fd, old = set_echo(fd) try: yield finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) def natsort_ints(iterable: Iterable[str]) -> list[str]: def convert(text: str) -> int | str: return int(text) if text.isdigit() else text def alphanum_key(key: str) -> tuple[int | str, ...]: return tuple(map(convert, re.split(r'(\d+)', key))) return sorted(iterable, key=alphanum_key) def get_hostname(fallback: str = '') -> str: import socket try: return socket.gethostname() or fallback except Exception: return fallback def resolve_editor_cmd(editor: str, shell_env: Mapping[str, str]) -> str | None: import shlex editor_cmd = list(shlex_split(editor)) editor_exe = (editor_cmd or ('',))[0] if editor_exe and os.path.isabs(editor_exe): return editor if not editor_exe: return None def patched(exe: str) -> str: editor_cmd[0] = exe return ' '.join(map(shlex.quote, editor_cmd)) if shell_env is os.environ: q = which(editor_exe, only_system=True) if q: return patched(q) elif 'PATH' in shell_env: import shutil q = shutil.which(editor_exe, path=shell_env['PATH']) if q: return patched(q) return None def get_editor_from_env(env: Mapping[str, str]) -> str | None: for var in ('VISUAL', 'EDITOR'): editor = env.get(var) if editor: editor = resolve_editor_cmd(editor, env) if editor: return editor return None def get_editor_from_env_vars(opts: Options | None = None) -> list[str]: from .child import default_env editor = get_editor_from_env(default_env()) if not editor: shell_env = read_shell_environment(opts) editor = get_editor_from_env(shell_env) for ans in (editor, 'vim', 'nvim', 'vi', 'emacs', 'hx', 'kak', 'micro', 'nano', 'vis'): if ans and which(next(shlex_split(ans)), only_system=True): break else: ans = 'vim' return list(shlex_split(ans)) def get_editor(opts: Options | None = None, path_to_edit: str = '', line_number: int = 0) -> list[str]: if opts is None: try: opts = get_options() except RuntimeError: # we are in a kitten from .cli import create_default_opts opts = create_default_opts() if opts.editor == '.': ans = get_editor_from_env_vars() else: ans = list(shlex_split(opts.editor)) ans[0] = os.path.expanduser(ans[0]) if path_to_edit: if line_number: eq = os.path.basename(ans[0]).lower() if eq in ('code', 'code.exe'): path_to_edit += f':{line_number}' ans.append('--goto') else: ans.append(f'+{line_number}') ans.append(path_to_edit) return ans def edit_file(path: str = '') -> NoReturn: ' This exists for: map whatever launch kitty +runpy "from kitty.utils import *; edit_file()" to edit kitty config ' from .config import prepare_config_file_for_editing editor = get_editor() path = path or prepare_config_file_for_editing() editor.append(path) os.execlp(editor[0], *editor) def is_path_in_temp_dir(path: str) -> bool: if not path: return False def abspath(x: str | None) -> str: if x: x = os.path.abspath(os.path.realpath(x)) return x or '' import tempfile path = abspath(path) candidates = frozenset(map(abspath, ('/tmp', '/dev/shm', os.environ.get('TMPDIR', None), tempfile.gettempdir()))) for q in candidates: if q and path.startswith(q): return True return False def is_ok_to_read_image_file(path: str, fd: int) -> bool: import stat path = os.path.abspath(os.path.realpath(path)) try: path_stat = os.stat(path, follow_symlinks=True) fd_stat = os.fstat(fd) except OSError: return False if not os.path.samestat(path_stat, fd_stat): return False parts = path.split(os.sep)[1:] if len(parts) < 1: return False if parts[0] in ('sys', 'proc', 'dev'): if parts[0] == 'dev': return len(parts) > 2 and parts[1] == 'shm' return False return stat.S_ISREG(fd_stat.st_mode) def resolve_abs_or_config_path(path: str, env: Mapping[str, str] | None = None, conf_dir: str | None = None) -> str: path = os.path.expanduser(path) path = expandvars(path, env or {}) if not os.path.isabs(path): path = os.path.join(conf_dir or config_dir, path) return path def resolve_custom_file(path: str) -> str: opts: Options | None = None with suppress(RuntimeError): opts = get_options() return resolve_abs_or_config_path(path, opts.env if opts else {}) def func_name(f: Any) -> str: if hasattr(f, '__name__'): return str(f.__name__) if hasattr(f, 'func') and hasattr(f.func, '__name__'): return str(f.func.__name__) return str(f) def resolved_shell(opts: Options | None = None) -> list[str]: q: str = getattr(opts, 'shell', '.') if q == '.': ans = [shell_path] else: env = {} if opts is not None: env['TERM'] = opts.term if 'SHELL' not in os.environ: env['SHELL'] = shell_path if 'HOME' not in os.environ: env['HOME'] = os.path.expanduser('~') if 'USER' not in os.environ: import pwd env['USER'] = pwd.getpwuid(os.geteuid()).pw_name def expand(x: str) -> str: return expandvars(x, env) ans = list(map(expand, shlex_split(q))) return ans @run_once def system_paths_on_macos() -> tuple[str, ...]: entries, seen = [], set() def add_from_file(x: str) -> None: try: f = open(x) except (FileNotFoundError, PermissionError): return with f: for line in f: line = line.strip() if line and not line.startswith('#') and line not in seen: if os.path.isdir(line): seen.add(line) entries.append(line) try: files = os.listdir('/etc/paths.d') except (FileNotFoundError, PermissionError): files = [] for name in sorted(files): add_from_file(os.path.join('/etc/paths.d', name)) add_from_file('/etc/paths') return tuple(entries) def which(name: str, only_system: bool = False) -> str | None: if os.sep in name: return name import shutil opts: Options | None = None with suppress(RuntimeError): opts = get_options() tried_paths = set() paths = [] append_paths = [] if opts and opts.exe_search_path: for x in opts.exe_search_path: x = x.strip() if x: if x[0] == '-': tried_paths.add(os.path.expanduser(x[1:])) elif x[0] == '+': append_paths.append(os.path.expanduser(x[1:])) else: paths.append(os.path.expanduser(x)) ep = os.environ.get('PATH') if ep: paths.extend(ep.split(os.pathsep)) paths.append(os.path.expanduser('~/.local/bin')) paths.append(os.path.expanduser('~/bin')) paths.extend(append_paths) ans = shutil.which(name, path=os.pathsep.join(x for x in paths if x not in tried_paths)) if ans: return ans # In case PATH is messed up try a default set of paths if is_macos: system_paths = system_paths_on_macos() else: system_paths = ('/usr/local/bin', '/opt/bin', '/usr/bin', '/bin', '/usr/sbin', '/sbin') tried_paths |= set(paths) system_paths = tuple(x for x in system_paths if x not in tried_paths) if system_paths: ans = shutil.which(name, path=os.pathsep.join(system_paths)) if ans: return ans tried_paths |= set(system_paths) if only_system or opts is None: return None shell_env = read_shell_environment(opts) for xenv in (shell_env, opts.env): q = xenv.get('PATH') if q: paths = [x for x in xenv['PATH'].split(os.pathsep) if x not in tried_paths] ans = shutil.which(name, path=os.pathsep.join(paths)) if ans: return ans tried_paths |= set(paths) return None @lru_cache(4) def read_resolved_shell_environment(shell: tuple[str, ...]) -> MappingProxyType[str, str]: import subprocess cmdline = list(shell) if '-l' not in cmdline and '--login' not in cmdline: cmdline += ['-l'] if '-i' not in cmdline and '--interactive' not in cmdline: cmdline += ['-i'] q = os.path.basename(cmdline[0]).lower() has_builtin = q in ('bash', 'zsh') cmd = 'builtin command env -0' if has_builtin else 'command env -0' ans: MappingProxyType[str, str] = MappingProxyType({}) from .child import openpty master, slave = openpty() os.set_blocking(master, False) try: p = subprocess.Popen( cmdline + ['-c', cmd], stdout=slave, stdin=slave, stderr=slave, start_new_session=True, close_fds=True, preexec_fn=clear_handled_signals) except FileNotFoundError: log_error(f'Could not find shell {cmdline[0]} to read environment') return ans with os.fdopen(master, 'rb') as stdout, os.fdopen(slave, 'wb'): raw = b'' from time import monotonic start_time = monotonic() ret: int | None = None while monotonic() - start_time < 1.5: try: ret = p.wait(0.01) except subprocess.TimeoutExpired: ret = None with suppress(Exception): raw += stdout.read() if ret is not None: break if ret is None: log_error(f'Timed out waiting for shell {cmdline} to quit while reading shell environment') p.kill() elif ret == 0: while True: try: x = stdout.read() except Exception: break if not x: break raw += x draw = raw.decode('utf-8', 'replace') env = {} for line in draw.split('\0'): k, sep, v = line.partition('=') if k and v and sep: env[k] = v ans = MappingProxyType(env) else: log_error(f'Failed to run shell {cmdline} to read its environment') return ans def read_shell_environment(opts: Options | None = None) -> MappingProxyType[str, str]: shell = resolved_shell(opts) return read_resolved_shell_environment(tuple(shell)) def parse_uri_list(text: str) -> Generator[str, None, None]: ' Get paths from file:// URLs ' from urllib.parse import unquote, urlparse for line in text.splitlines(): if not line or line.startswith('#'): continue if not line.startswith('file://'): yield line continue try: purl = urlparse(line, allow_fragments=False) except Exception: yield line continue if purl.path: yield unquote(purl.path) def edit_config_file() -> None: from kitty.config import prepare_config_file_for_editing p = prepare_config_file_for_editing() editor = get_editor() os.execvp(editor[0], editor + [p]) class SSHConnectionData(NamedTuple): binary: str hostname: str port: int | None = None identity_file: str = '' extra_args: tuple[tuple[str, str], ...] = () def get_new_os_window_size( metrics: 'OSWindowSize', width: int, height: int, unit: str, incremental: bool = False, has_window_scaling: bool = True ) -> tuple[int, int]: if unit == 'cells': cw = metrics['cell_width'] ch = metrics['cell_height'] width *= cw height *= ch if has_window_scaling: width = round(width / metrics['xscale']) height = round(height / metrics['yscale']) if incremental: w = metrics['width'] + width h = metrics['height'] + height else: w = width or metrics['width'] h = height or metrics['height'] return w, h def get_all_processes() -> Iterable[int]: if is_macos: from kitty.fast_data_types import get_all_processes as f yield from f() else: for c in os.listdir('/proc'): if c.isdigit(): yield int(c) def is_kitty_gui_cmdline(*cmd: str) -> bool: if not cmd: return False if os.path.basename(cmd[0]) != 'kitty': return False if len(cmd) == 1: return True s = cmd[1][:1] if s == '@': return False if s == '+': if cmd[1] == '+': return len(cmd) > 2 and cmd[2] == 'open' return cmd[1] == '+open' return True def reload_conf_in_all_kitties() -> None: import signal from kitty.child import cmdline_of_pid for pid in get_all_processes(): try: cmd = cmdline_of_pid(pid) except Exception: continue if cmd and is_kitty_gui_cmdline(*cmd): os.kill(pid, signal.SIGUSR1) @run_once def control_codes_pat() -> 'Pattern[str]': return re.compile('[\x00-\x09\x0b-\x1f\x7f-\x9f]') def sanitize_control_codes(text: str, replace_with: str = '') -> str: return control_codes_pat().sub(replace_with, text) def hold_till_enter() -> None: import subprocess from .constants import kitten_exe subprocess.Popen([kitten_exe(), '__hold_till_enter__']).wait() def cleanup_ssh_control_masters() -> None: import glob import subprocess try: files = frozenset(glob.glob(os.path.join(runtime_dir(), ssh_control_master_template.format( kitty_pid=os.getpid(), ssh_placeholder='*')))) except OSError: return workers = tuple(subprocess.Popen([ 'ssh', '-o', f'ControlPath={x}', '-O', 'exit', 'kitty-unused-host-name'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, preexec_fn=clear_handled_signals) for x in files) for w in workers: w.wait() for x in files: with suppress(OSError): os.remove(x) def path_from_osc7_url(url: str | bytes) -> str: if isinstance(url, bytes): url = url.decode('utf-8') if url.startswith('kitty-shell-cwd://'): return '/' + url.split('/', 3)[-1] if url.startswith('file://'): from urllib.parse import unquote, urlparse return unquote(urlparse(url).path) return '' @run_once def macos_version() -> tuple[int, ...]: # platform.mac_ver does not work thanks to Apple's stupid "hardening", so just use sw_vers import subprocess try: o = subprocess.check_output(['sw_vers', '-productVersion'], stderr=subprocess.STDOUT).decode() except Exception: return 0, 0, 0 return tuple(map(int, o.strip().split('.'))) @lru_cache(maxsize=2) def less_version(less_exe: str = 'less') -> int: import subprocess o = subprocess.check_output([less_exe, '-V'], stderr=subprocess.STDOUT).decode() m = re.match(r'less (\d+)', o) if m is None: raise ValueError(f'Invalid version string for less: {o}') return int(m.group(1)) def is_pid_alive(pid: int) -> bool: try: os.kill(pid, 0) except ProcessLookupError: return False except Exception: pass return True def safer_fork() -> int: pid = os.fork() if pid: # master import ssl ssl.RAND_add(os.urandom(32), 0.0) else: # child import atexit atexit._clear() return pid def docs_url(which: str = '', local_docs_root: str | None = '') -> str: from urllib.parse import quote from .conf.types import resolve_ref from .constants import local_docs, website_url if local_docs_root is None: ld = '' else: ld = local_docs_root or local_docs() base, frag = which.partition('#')[::2] base = base.strip('/') if frag.startswith('ref='): ref = frag[4:] which = resolve_ref(ref, lambda x: x) if which.startswith('https://') or which.startswith('http://'): return which base, frag = which.partition('#')[::2] base = base.strip('/') if ld: base = base or 'index' url = f'file://{ld}/' + quote(base) + '.html' else: url = website_url(base) if frag: url += '#' + frag return url def sanitize_for_bracketed_paste(text: bytes) -> bytes: pat = re.compile(b'(?:(?:\033\\\x5b)|(?:\x9b))201~') while True: new_text = pat.sub(b'', text) if new_text == text: break text = new_text return text @lru_cache(maxsize=64) def sanitize_url_for_display_to_user(url: str) -> str: from urllib.parse import unquote, urlparse, urlunparse try: purl = urlparse(url) if purl.netloc: purl = purl._replace(netloc=purl.netloc.encode('idna').decode('ascii')) if purl.path: purl = purl._replace(path=unquote(purl.path)) url = urlunparse(purl) except Exception as e: log_error(e) url = 'Unparseable URL: ' + url return url def extract_all_from_tarfile_safely(tf: 'tarfile.TarFile', dest: str) -> None: # Ensure that all extracted items are within dest def is_within_directory(directory: str, target: str) -> bool: abs_directory = os.path.abspath(directory) abs_target = os.path.abspath(target) prefix = os.path.commonprefix((abs_directory, abs_target)) return prefix == abs_directory def safe_extract(tar: 'tarfile.TarFile', path: str = ".", numeric_owner: bool = False) -> None: for member in tar.getmembers(): member_path = os.path.join(path, member.name) if not is_within_directory(path, member_path): raise ValueError(f'Attempted path traversal in tar file: {member.name}') tar.extractall(path, tar.getmembers(), numeric_owner=numeric_owner) safe_extract(tf, dest) def is_png(path: str) -> bool: if path: with suppress(Exception), open(path, 'rb') as f: header = f.read(8) return header.startswith(b'\211PNG\r\n\032\n') return False def cmdline_for_hold(cmd: Sequence[str] = (), opts: Optional['Options'] = None) -> list[str]: if opts is None: with suppress(RuntimeError): opts = get_options() if opts is None: from .options.types import defaults opts = defaults ksi = ' '.join(opts.shell_integration) import shlex shell = shlex.join(resolved_shell(opts)) return [kitten_exe(), 'run-shell', f'--shell={shell}', f'--shell-integration={ksi}', '--env=KITTY_HOLD=1'] + list(cmd) def safe_mtime(path: str) -> float | None: with suppress(OSError): return os.path.getmtime(path) return None @run_once def get_custom_window_icon() -> tuple[float, str] | tuple[None, None]: filenames = ['kitty.app.png'] if is_macos: # On macOS, prefer icns to png. filenames.insert(0, 'kitty.app.icns') for name in filenames: custom_icon_path = os.path.join(config_dir, name) custom_icon_mtime = safe_mtime(custom_icon_path) if custom_icon_mtime is not None: return custom_icon_mtime, custom_icon_path return None, None def key_val_matcher(items: Iterable[tuple[str, str]], key_pat: 're.Pattern[str]', val_pat: Optional['re.Pattern[str]']) -> bool: for key, val in items: if key_pat.search(key) is not None and ( val_pat is None or val_pat.search(val) is not None): return True return False def shlex_split(text: str, allow_ansi_quoted_strings: bool = False) -> Iterator[str]: yield from Shlex(text, allow_ansi_quoted_strings) def shlex_split_with_positions(text: str, allow_ansi_quoted_strings: bool = False) -> Iterator[tuple[int, str]]: s = Shlex(text, allow_ansi_quoted_strings) while (q := s.next_word())[0] > -1: yield q def timed_debug_print(*a: Any, sep: str = ' ', end: str = '\n') -> None: _timed_debug_print(sep.join(map(str, a)) + end) def lock_file(f: BinaryIO) -> None: if not f.writable(): raise ValueError('Cannot lock files not opened in writable mode') fcntl.lockf(f, fcntl.LOCK_EX) def unlock_file(f: BinaryIO) -> None: if not f.writable(): raise ValueError('Cannot unlock files not opened in writable mode') fcntl.lockf(f, fcntl.LOCK_UN)