Files
kitty-mirror/shell-integration/ssh/bootstrap.py

332 lines
10 KiB
Python

#!/usr/bin/env python
# License: GPLv3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
import base64
import contextlib
import errno
import io
import json
import os
import pwd
import shutil
import subprocess
import sys
import tarfile
import tempfile
import termios
tty_file_obj = None
echo_on = int('ECHO_ON')
data_dir = shell_integration_dir = ''
request_data = int('REQUEST_DATA')
leading_data = b''
login_shell = os.environ.get('SHELL') or '/bin/sh'
try:
login_shell = pwd.getpwuid(os.geteuid()).pw_shell
except KeyError:
pass
export_home_cmd = b'EXPORT_HOME_CMD'
if export_home_cmd:
HOME = base64.standard_b64decode(export_home_cmd).decode('utf-8')
os.chdir(HOME)
else:
HOME = os.path.expanduser('~')
def set_echo(fd, on=False):
if fd < 0:
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
new = termios.tcgetattr(fd)
if on:
new[3] |= termios.ECHO
else:
new[3] &= ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, new)
return fd, old
def cleanup():
global tty_file_obj
if tty_file_obj is not None:
if echo_on:
set_echo(tty_file_obj.fileno(), True)
tty_file_obj.close()
tty_file_obj = None
def write_all(fd, data):
if isinstance(data, str):
data = data.encode('utf-8')
data = memoryview(data)
while data:
try:
n = os.write(fd, data)
except BlockingIOError:
continue
if not n:
break
data = data[n:]
def dcs_to_kitty(payload, type='ssh'):
if isinstance(payload, str):
payload = payload.encode('utf-8')
payload = base64.standard_b64encode(payload)
return b'\033P@kitty-' + type.encode('ascii') + b'|' + payload + b'\033\\'
def send_data_request():
write_all(tty_file_obj.fileno(), dcs_to_kitty('id=REQUEST_ID:pwfile=PASSWORD_FILENAME:pw=DATA_PASSWORD'))
def debug(msg):
data = dcs_to_kitty('debug: {}'.format(msg), 'print')
if tty_file_obj is None:
with open(os.ctermid(), 'wb') as fl:
write_all(fl.fileno(), data)
else:
write_all(tty_file_obj.fileno(), data)
def apply_env_vars(raw):
global login_shell
def process_defn(defn):
parts = json.loads(defn)
if len(parts) == 1:
key, val = parts[0], ''
else:
key, val, literal_quote = parts
if not literal_quote:
val = os.path.expandvars(val)
os.environ[key] = val
for line in raw.splitlines():
val = line.split(' ', 1)[-1]
if line.startswith('export '):
process_defn(val)
elif line.startswith('unset '):
os.environ.pop(json.loads(val)[0], None)
login_shell = os.environ.pop('KITTY_LOGIN_SHELL', login_shell)
def move(src, base_dest):
for x in os.listdir(src):
path = os.path.join(src, x)
dest = os.path.join(base_dest, x)
if os.path.islink(path):
try:
os.unlink(dest)
except EnvironmentError:
pass
os.symlink(os.readlink(path), dest)
elif os.path.isdir(path):
if not os.path.exists(dest):
os.makedirs(dest)
move(path, dest)
else:
shutil.move(path, dest)
def compile_terminfo(base):
try:
tic = shutil.which('tic')
except AttributeError:
# python2
for x in os.environ.get('PATH', '').split(os.pathsep):
q = os.path.join(x, 'tic')
if os.access(q, os.X_OK) and os.path.isfile(q):
tic = q
break
else:
tic = ''
if not tic:
return
tname = '.terminfo'
q = os.path.join(base, tname, '78', 'xterm-kitty')
if not os.path.exists(q):
try:
os.makedirs(os.path.dirname(q))
except EnvironmentError as e:
if e.errno != errno.EEXIST:
raise
os.symlink('../x/xterm-kitty', q)
if os.path.exists('/usr/share/misc/terminfo.cdb'):
# NetBSD requires this
os.symlink('../../.terminfo.cdb', os.path.join(base, tname, 'x', 'xterm-kitty'))
tname += '.cdb'
os.environ['TERMINFO'] = os.path.join(HOME, tname)
p = subprocess.Popen(
[tic, '-x', '-o', os.path.join(base, tname), os.path.join(base, '.terminfo', 'kitty.terminfo')],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
output = p.stdout.read()
rc = p.wait()
if rc != 0:
getattr(sys.stderr, 'buffer', sys.stderr).write(output)
raise SystemExit('Failed to compile the terminfo database')
def iter_base64_data(f):
global leading_data
started = 0
while True:
line = f.readline().rstrip()
if started == 0:
if line == b'KITTY_DATA_START':
started = 1
else:
leading_data += line
elif started == 1:
if line == b'OK':
started = 2
else:
raise SystemExit(line.decode('utf-8', 'replace').rstrip())
else:
if line == b'KITTY_DATA_END':
break
yield line
@contextlib.contextmanager
def temporary_directory(dir, prefix):
# tempfile.TemporaryDirectory not available in python2
tdir = tempfile.mkdtemp(dir=dir, prefix=prefix)
try:
yield tdir
finally:
shutil.rmtree(tdir)
def get_data():
global data_dir, shell_integration_dir, leading_data
data = []
data = b''.join(iter_base64_data(tty_file_obj))
if leading_data:
# clear current line as it might have things echoed on it from leading_data
# because we only turn off echo in this script whereas the leading bytes could
# have been sent before the script had a chance to run
sys.stdout.write('\r\033[K')
data = base64.standard_b64decode(data)
with temporary_directory(dir=HOME, prefix='.kitty-ssh-kitten-untar-') as tdir, tarfile.open(fileobj=io.BytesIO(data)) as tf:
try:
# We have to use fully_trusted as otherwise it refuses to extract,
# for example, symlinks that point to absolute paths outside
# tdir.
tf.extractall(tdir, filter='fully_trusted')
except TypeError:
tf.extractall(tdir)
with open(tdir + '/data.sh') as f:
env_vars = f.read()
apply_env_vars(env_vars)
data_dir = os.environ.pop('KITTY_SSH_KITTEN_DATA_DIR')
if not os.path.isabs(data_dir):
data_dir = os.path.join(HOME, data_dir)
data_dir = os.path.abspath(data_dir)
shell_integration_dir = os.path.join(data_dir, 'shell-integration')
compile_terminfo(tdir + '/home')
move(tdir + '/home', HOME)
if os.path.exists(tdir + '/root'):
move(tdir + '/root', '/')
def exec_with_better_error(*a):
try:
os.execlp(*a)
except OSError as err:
if err.errno == errno.ENOENT:
raise SystemExit('The program: "' + a[0] + '" was not found')
raise
def exec_zsh_with_integration():
zdotdir = os.environ.get('ZDOTDIR') or ''
if not zdotdir:
zdotdir = HOME
os.environ.pop('KITTY_ORIG_ZDOTDIR', None) # ensure this is not propagated
else:
os.environ['KITTY_ORIG_ZDOTDIR'] = zdotdir
# dont prevent zsh-newuser-install from running
for q in ('.zshrc', '.zshenv', '.zprofile', '.zlogin'):
if os.path.exists(os.path.join(zdotdir, q)):
os.environ['ZDOTDIR'] = shell_integration_dir + '/zsh'
exec_with_better_error(login_shell, os.path.basename(login_shell), '-l')
os.environ.pop('KITTY_ORIG_ZDOTDIR', None) # ensure this is not propagated
def exec_fish_with_integration():
if not os.environ.get('XDG_DATA_DIRS'):
os.environ['XDG_DATA_DIRS'] = shell_integration_dir
else:
os.environ['XDG_DATA_DIRS'] = shell_integration_dir + ':' + os.environ['XDG_DATA_DIRS']
os.environ['KITTY_FISH_XDG_DATA_DIR'] = shell_integration_dir
exec_with_better_error(login_shell, os.path.basename(login_shell), '-l')
def exec_bash_with_integration():
os.environ['ENV'] = os.path.join(shell_integration_dir, 'bash', 'kitty.bash')
os.environ['KITTY_BASH_INJECT'] = '1'
if not os.environ.get('HISTFILE'):
os.environ['HISTFILE'] = os.path.join(HOME, '.bash_history')
os.environ['KITTY_BASH_UNEXPORT_HISTFILE'] = '1'
exec_with_better_error(login_shell, os.path.basename('login_shell'), '--posix')
def exec_with_shell_integration():
shell_name = os.path.basename(login_shell).lower()
if shell_name == 'zsh':
exec_zsh_with_integration()
if shell_name == 'fish':
exec_fish_with_integration()
if shell_name == 'bash':
exec_bash_with_integration()
def install_kitty_bootstrap():
kitty_remote = os.environ.pop('KITTY_REMOTE', '')
kitty_exists = shutil.which('kitty')
kitty_dir = os.path.join(data_dir, 'kitty', 'bin')
os.environ['SSH_KITTEN_KITTY_DIR'] = kitty_dir
if kitty_remote == 'yes' or (kitty_remote == 'if-needed' and not kitty_exists):
if kitty_exists:
os.environ['PATH'] = kitty_dir + os.pathsep + os.environ['PATH']
else:
os.environ['PATH'] = os.environ['PATH'] + os.pathsep + kitty_dir
def main():
global tty_file_obj, login_shell
# the value of O_CLOEXEC below is on macOS which is most likely to not have
# os.O_CLOEXEC being still stuck with python2
tty_file_obj = os.fdopen(os.open(os.ctermid(), os.O_RDWR | getattr(os, 'O_CLOEXEC', 16777216)), 'rb')
try:
if request_data:
set_echo(tty_file_obj.fileno(), on=False)
send_data_request()
get_data()
finally:
cleanup()
cwd = os.environ.pop('KITTY_LOGIN_CWD', '')
install_kitty_bootstrap()
if cwd:
try:
os.chdir(cwd)
except Exception as err:
print(f'Failed to change working directory to: {cwd} with error: {err}', file=sys.stderr)
ksi = frozenset(filter(None, os.environ.get('KITTY_SHELL_INTEGRATION', '').split()))
exec_cmd = b'EXEC_CMD'
if exec_cmd:
os.environ.pop('KITTY_SHELL_INTEGRATION', None)
cmd = base64.standard_b64decode(exec_cmd).decode('utf-8')
exec_with_better_error(login_shell, os.path.basename(login_shell), '-c', cmd)
TEST_SCRIPT # noqa
if ksi and 'no-rc' not in ksi:
exec_with_shell_integration()
os.environ.pop('KITTY_SHELL_INTEGRATION', None)
exec_with_better_error(login_shell, '-' + os.path.basename(login_shell))
main()