When saving session add option to save the foreground process running in the shell so that it is also restarted

Useful if user builds up session to save by running programs via
the shell.

Note that the serialization format for session files has changed
slightly, becoming more robust and allowing us to add more types
of saved data in the future, without overloading user_vars and thus
risking name conflicts.
This commit is contained in:
Kovid Goyal
2025-08-16 16:45:55 +05:30
parent dcc9ade7ae
commit f91a0f6986
7 changed files with 89 additions and 22 deletions

View File

@@ -34,7 +34,7 @@ default_pager_for_help = ('less', '-iRXF')
kitty_run_data: dict[str, Any] = getattr(sys, 'kitty_run_data', {})
launched_by_launch_services = kitty_run_data.get('launched_by_launch_services', False)
is_quick_access_terminal_app = kitty_run_data.get('is_quick_access_terminal_app', False)
serialize_user_var_name = 'kitty_serialize_window_id'
unserialize_launch_flag = 'kitty-unserialize-data='
if getattr(sys, 'frozen', False):
extensions_dir: str = kitty_run_data['extensions_dir']

View File

@@ -17,7 +17,7 @@ from .fast_data_types import add_timer, get_boss, get_options, get_os_window_tit
from .options.utils import env as parse_env
from .tabs import Tab, TabManager
from .types import LayerShellConfig, OverlayType, run_once
from .utils import get_editor, log_error, resolve_custom_file, which
from .utils import get_editor, log_error, resolve_custom_file, resolved_shell, which
from .window import CwdRequest, CwdRequestType, Watchers, Window
@@ -610,6 +610,7 @@ def _launch(
rc_from_window: Window | None = None,
base_env: dict[str, str] | None = None,
child_death_callback: Callable[[int, Exception | None], None] | None = None,
startup_command_via_shell_integration: Sequence[str] = (),
) -> Window | None:
source_window = boss.active_window_for_cwd
if opts.source_window:
@@ -775,6 +776,17 @@ def _launch(
tab = tab_for_window(boss, opts, target_tab, next_to)
watchers = load_watch_modules(opts.watcher)
with Window.set_ignore_focus_changes_for_new_windows(opts.keep_focus):
startup_command_env_added = False
if startup_command_via_shell_integration:
from .shell_integration import join
try:
scmd = kw.get('cmd') or resolved_shell(get_options())
env = env or {}
env['KITTY_SI_RUN_COMMAND_AT_STARTUP'] = join(scmd[0], startup_command_via_shell_integration)
startup_command_env_added = True
except Exception:
pass # shell is not a known shell
new_window: Window = tab.new_window(
env=env or None, watchers=watchers or None, is_clone_launch=is_clone_launch, next_to=next_to, **kw)
if child_death_callback is not None:
@@ -786,6 +798,10 @@ def _launch(
new_window.creation_spec = new_window.creation_spec._replace(spacing=tuple(opts.spacing))
if opts.color:
new_window.creation_spec = new_window.creation_spec._replace(colors=tuple(opts.color))
if startup_command_env_added and new_window.creation_spec.env:
def is_not_scmd(x: tuple[str, str]) -> bool:
return x[0] != 'KITTY_SI_RUN_COMMAND_AT_STARTUP'
new_window.creation_spec = new_window.creation_spec._replace(env=tuple(filter(is_not_scmd, new_window.creation_spec.env)))
if spacing:
patch_window_edges(new_window, spacing)
tab.relayout()
@@ -820,12 +836,15 @@ def launch(
rc_from_window: Window | None = None,
base_env: dict[str, str] | None = None,
child_death_callback: Callable[[int, Exception | None], None] | None = None,
startup_command_via_shell_integration: Sequence[str] = (),
) -> Window | None:
active = boss.active_window
if opts.keep_focus and active:
orig, active.ignore_focus_changes = active.ignore_focus_changes, True
try:
return _launch(boss, opts, args, target_tab, force_target_tab, is_clone_launch, rc_from_window, base_env, child_death_callback)
return _launch(
boss, opts, args, target_tab, force_target_tab, is_clone_launch, rc_from_window, base_env,
child_death_callback, startup_command_via_shell_integration)
finally:
if opts.keep_focus and active:
active.ignore_focus_changes = orig

View File

@@ -7,7 +7,6 @@ from itertools import repeat
from typing import Any, Callable, NamedTuple
from kitty.borders import BorderColor
from kitty.constants import serialize_user_var_name
from kitty.fast_data_types import Region, set_active_window, viewport_for_window
from kitty.options.types import Options
from kitty.types import Edges, WindowGeometry, WindowMapper
@@ -216,15 +215,11 @@ def distribute_indexed_bias(base_bias: Sequence[float], index_bias_map: dict[int
return normalize_biases(ans)
def create_window_id_map_for_unserialize(all_windows: WindowList, serialize_user_var_name: str = serialize_user_var_name) -> dict[int, int]:
def create_window_id_map_for_unserialize(all_windows: WindowList) -> dict[int, int]:
window_id_map = {}
for w in all_windows:
k = w.user_vars.pop(serialize_user_var_name, None)
if k is not None:
try:
window_id_map[int(k)] = w.id
except Exception:
pass
if w.serialized_id:
window_id_map[w.serialized_id] = w.id
return window_id_map

View File

@@ -12,7 +12,7 @@ from gettext import gettext as _
from typing import TYPE_CHECKING, Any, Optional, Sequence, Union
from .cli_stub import CLIOptions, SaveAsSessionOptions
from .constants import config_dir
from .constants import config_dir, unserialize_launch_flag
from .fast_data_types import get_options
from .layout.interface import all_layouts
from .options.types import Options
@@ -41,11 +41,13 @@ ResizeSpec = tuple[str, int]
class WindowSpec:
def __init__(self, launch_spec: Union['LaunchSpec', 'SpecialWindowInstance']):
def __init__(self, launch_spec: Union['LaunchSpec', 'SpecialWindowInstance'], serialized_id: int = 0, run_command_at_shell_startup: Sequence[str] = ()):
self.launch_spec = launch_spec
self.resize_spec: ResizeSpec | None = None
self.focus_matching_window_spec: str = ''
self.is_background_process = False
self.serialized_id = serialized_id
self.run_command_at_shell_startup = run_command_at_shell_startup
if hasattr(launch_spec, 'opts'): # LaunchSpec
from .launch import LaunchSpec
assert isinstance(launch_spec, LaunchSpec)
@@ -118,6 +120,10 @@ class Session:
if isinstance(cmd, str) and cmd:
needs_expandvars = True
cmd = list(shlex_split(cmd))
serialize_data: dict[str, Any] = {'id': 0, 'cmd_at_shell_startup': ()}
if cmd and cmd[0].startswith(unserialize_launch_flag):
serialize_data = json.loads(cmd[0][len(unserialize_launch_flag):])
del cmd[0]
spec = parse_launch_args(cmd)
if needs_expandvars:
assert isinstance(cmd, list)
@@ -132,7 +138,9 @@ class Session:
if t.next_title and not spec.opts.window_title:
spec.opts.window_title = t.next_title
spec.opts.cwd = spec.opts.cwd or t.cwd
t.windows.append(WindowSpec(spec))
t.windows.append(WindowSpec(
spec, serialized_id=serialize_data['id'],
run_command_at_shell_startup=serialize_data.get('cmd_at_shell_startup', ())))
t.next_title = None
if t.pending_resize_spec is not None:
t.windows[-1].resize_spec = t.pending_resize_spec
@@ -453,6 +461,18 @@ def save_as_session_options() -> str:
--save-only
type=bool-set
Only save the specified session file, dont open it in an editor to review after saving.
--use-foreground-process
type=bool-set
When saving windows that were started with the default shell but are currently running some
other process inside that shell, save that process so that when the session is used
both the shell :bold:`and` the process running inside it are re-started. This is most useful
when you have opened programs like editors or similar inside windows that started out running
the shell and you want to preserve that. WARNING: Be careful when using this option, if you are
running some dangerous command like :file:`rm` or :file:`mv` or similar in a shell, it will be re-run when
the session is executed if you use this option. Note that this option requires :ref:`shell_integration`
to work.
'''
@@ -461,7 +481,7 @@ def save_as_session_part2(boss: BossType, opts: SaveAsSessionOptions, path: str)
return
from .config import atomic_save
path = os.path.abspath(os.path.expanduser(path))
session = '\n'.join(boss.serialize_state_as_session())
session = '\n'.join(boss.serialize_state_as_session(opts))
atomic_save(session.encode(), path)
if not opts.save_only:
boss.edit_file(path)

View File

@@ -3,13 +3,16 @@
import os
import re
import subprocess
from collections.abc import Callable
from contextlib import suppress
from typing import Iterable
from .constants import shell_integration_dir
from .fast_data_types import get_options
from .options.types import Options, defaults
from .types import run_once
from .utils import log_error, which
@@ -182,6 +185,9 @@ ENV_SERIALIZERS: dict[str, Callable[[dict[str, str]], str]] = {
'fish': fish_serialize_env,
}
QUOTERES = {
'fish': as_fish_str_literal
}
def get_supported_shell_name(path: str) -> str | None:
name = os.path.basename(path)
@@ -205,6 +211,22 @@ def serialize_env(path: str, env: dict[str, str]) -> str:
return ENV_SERIALIZERS[name](env)
@run_once
def unsafe_pat() -> re.Pattern[str]:
return re.compile(r'[^\w@%+=:,./-]', re.ASCII)
def join(path: str, cmd: Iterable[str]) -> str:
name = get_supported_shell_name(path)
_find_unsafe = unsafe_pat().search
if not name:
raise ValueError(f'{path} is not a supported shell')
q = QUOTERES.get(name, as_str_literal)
def quote(x: str) -> str:
return x if _find_unsafe(x) is None else q(x)
return ' '.join(map(quote, cmd))
def get_effective_ksi_env_var(opts: Options | None = None) -> str:
opts = opts or get_options()
if 'disabled' in opts.shell_integration:

View File

@@ -257,9 +257,12 @@ class Tab: # {{{
self.new_special_window(spec)
else:
from .launch import launch
launched_window = launch(boss, spec.opts, spec.args, target_tab=target_tab, force_target_tab=True)
launched_window = launch(
boss, spec.opts, spec.args, target_tab=target_tab, force_target_tab=True,
startup_command_via_shell_integration=window.run_command_at_shell_startup)
if launched_window is not None:
launched_window.created_in_session_name = self.created_in_session_name
launched_window.serialized_id = window.serialized_id
if window.resize_spec is not None:
self.resize_window(*window.resize_spec)
if window.focus_matching_window_spec:

View File

@@ -34,7 +34,7 @@ from .constants import (
clear_handled_signals,
config_dir,
kitten_exe,
serialize_user_var_name,
unserialize_launch_flag,
wakeup_io_loop,
)
from .fast_data_types import (
@@ -654,6 +654,7 @@ class Window:
initial_ignore_focus_changes_context_manager_in_operation: bool = False
creation_spec: WindowCreationSpec | None = None
created_in_session_name: str = ''
serialized_id: int = 0
@classmethod
@contextmanager
@@ -1966,9 +1967,7 @@ class Window:
ans.append(f'--remote-control-password={shlex.join((pw,) + tuple(rcp_items))}')
if self.creation_spec:
if self.creation_spec.env:
env = dict(self.creation_spec.env)
env.pop('KITTY_PIPE_DATA', None)
for k, v in env.items():
for k, v in self.creation_spec.env:
if k not in ('KITTY_PIPE_DATA',):
ans.append(f'--env={k}={v}')
for cs in self.creation_spec.colors:
@@ -1980,7 +1979,6 @@ class Window:
if self.creation_spec.hold_after_ssh:
ans.append('--hold-after-ssh')
ans.extend(f'--var={k}={v}' for k, v in self.user_vars.items())
ans.append(f'--var={serialize_user_var_name}={self.id}')
ans.extend(self.padding.as_launch_args())
ans.extend(self.margin.as_launch_args('margin'))
if self.override_title:
@@ -2003,9 +2001,19 @@ class Window:
t = 'overlay-main' if self.overlay_type is OverlayType.main else 'overlay'
ans.append(f'--type={t}')
cmd: list[str] = []
if self.creation_spec and self.creation_spec.cmd:
if self.creation_spec.cmd != resolved_shell(get_options()):
ans.extend(self.creation_spec.cmd)
cmd = self.creation_spec.cmd
unserialize_data: dict[str, int | list[str]] = {'id': self.id}
if not cmd and ser_opts.use_foreground_process and self.child.pid != (pid := self.child.pid_for_cwd) and pid is not None:
# we have a shell running some command
with suppress(Exception):
fcmd = self.child.cmdline_of_pid(pid)
if fcmd:
unserialize_data['cmd_at_shell_startup'] = fcmd
ans.insert(1, unserialize_launch_flag + json.dumps(unserialize_data))
ans.extend(cmd)
return ans