mirror of
https://github.com/kovidgoyal/kitty.git
synced 2025-12-13 20:36:22 +01:00
519 lines
19 KiB
Python
519 lines
19 KiB
Python
#!/usr/bin/env python
|
|
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections.abc import Iterable, Iterator, Sequence
|
|
from contextlib import suppress
|
|
from functools import lru_cache, partial
|
|
from time import time_ns
|
|
from types import GeneratorType
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
Optional,
|
|
cast,
|
|
)
|
|
|
|
from .cli import parse_args
|
|
from .cli_stub import RCOptions
|
|
from .constants import RC_ENCRYPTION_PROTOCOL_VERSION, appname, version
|
|
from .fast_data_types import (
|
|
AES256GCMDecrypt,
|
|
AES256GCMEncrypt,
|
|
EllipticCurveKey,
|
|
get_boss,
|
|
get_options,
|
|
monotonic,
|
|
read_command_response,
|
|
send_data_to_peer,
|
|
)
|
|
from .rc.base import NoResponse, PayloadGetter, all_command_names, command_for_name
|
|
from .types import AsyncResponse
|
|
from .typing_compat import BossType, WindowType
|
|
from .utils import TTYIO, log_error, parse_address_spec, resolve_custom_file
|
|
|
|
active_async_requests: dict[str, float] = {}
|
|
active_streams: dict[str, str] = {}
|
|
if TYPE_CHECKING:
|
|
from .window import Window
|
|
|
|
|
|
def encode_response_for_peer(response: Any) -> bytes:
|
|
return b'\x1bP@kitty-cmd' + json.dumps(response).encode('utf-8') + b'\x1b\\'
|
|
|
|
|
|
def parse_cmd(serialized_cmd: memoryview, encryption_key: EllipticCurveKey) -> dict[str, Any]:
|
|
# See https://github.com/python/cpython/issues/74379 for why we cant use
|
|
# memoryview directly :((
|
|
try:
|
|
pcmd = json.loads(bytes(serialized_cmd))
|
|
except Exception:
|
|
log_error('Failed to parse JSON payload of remote command, ignoring it')
|
|
return {}
|
|
if not isinstance(pcmd, dict) or 'version' not in pcmd:
|
|
log_error('JSON payload of remote command is invalid, must be an object with a version field')
|
|
return {}
|
|
pcmd.pop('password', None)
|
|
if 'encrypted' in pcmd:
|
|
if pcmd.get('enc_proto', '1') != RC_ENCRYPTION_PROTOCOL_VERSION:
|
|
log_error(f'Ignoring encrypted rc command with unsupported protocol: {pcmd.get("enc_proto")}')
|
|
return {}
|
|
pubkey = pcmd.get('pubkey', '')
|
|
if not pubkey:
|
|
log_error('Ignoring encrypted rc command without a public key')
|
|
d = AES256GCMDecrypt(encryption_key.derive_secret(base64.b85decode(pubkey)), base64.b85decode(pcmd['iv']), base64.b85decode(pcmd['tag']))
|
|
data = d.add_data_to_be_decrypted(base64.b85decode(pcmd['encrypted']), True)
|
|
pcmd = json.loads(data)
|
|
if not isinstance(pcmd, dict) or 'version' not in pcmd:
|
|
return {}
|
|
delta = time_ns() - pcmd.pop('timestamp')
|
|
if abs(delta) > 5 * 60 * 1e9:
|
|
log_error(
|
|
f'Ignoring encrypted rc command with timestamp {delta / 1e9:.1f} seconds from now.'
|
|
' Could be an attempt at a replay attack or an incorrect clock on a remote machine.')
|
|
return {}
|
|
return pcmd
|
|
|
|
|
|
class CMDChecker:
|
|
|
|
def __call__(self, pcmd: dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: dict[str, Any]) -> bool | None:
|
|
return False
|
|
|
|
|
|
@lru_cache(maxsize=64)
|
|
def is_cmd_allowed_loader(path: str) -> CMDChecker:
|
|
import runpy
|
|
try:
|
|
m = runpy.run_path(path)
|
|
func: CMDChecker = m['is_cmd_allowed']
|
|
except Exception as e:
|
|
log_error(f'Failed to load cmd check function from {path} with error: {e}')
|
|
func = CMDChecker()
|
|
return func
|
|
|
|
|
|
@lru_cache(maxsize=1024)
|
|
def fnmatch_pattern(pat: str) -> 're.Pattern[str]':
|
|
from fnmatch import translate
|
|
return re.compile(translate(pat))
|
|
|
|
|
|
def remote_control_allowed(
|
|
pcmd: dict[str, Any], remote_control_passwords: dict[str, Sequence[str]] | None,
|
|
window: Optional['Window'], extra_data: dict[str, Any]
|
|
) -> bool:
|
|
if not remote_control_passwords:
|
|
return True
|
|
pw = pcmd.get('password', '')
|
|
auth_items = remote_control_passwords.get(pw)
|
|
if pw == '!':
|
|
auth_items = None
|
|
if auth_items is None:
|
|
if '!' in remote_control_passwords:
|
|
raise PermissionError()
|
|
return False
|
|
from .remote_control import password_authorizer
|
|
pa = password_authorizer(auth_items)
|
|
if not pa.is_cmd_allowed(pcmd, window, False, extra_data):
|
|
raise PermissionError()
|
|
return True
|
|
|
|
|
|
class PasswordAuthorizer:
|
|
|
|
def __init__(self, auth_items: Iterable[str]) -> None:
|
|
self.command_patterns = []
|
|
self.function_checkers = []
|
|
self.name = ''
|
|
for item in auth_items:
|
|
if item.endswith('.py'):
|
|
path = os.path.abspath(resolve_custom_file(item))
|
|
self.function_checkers.append(is_cmd_allowed_loader(path))
|
|
else:
|
|
self.command_patterns.append(fnmatch_pattern(item))
|
|
|
|
def is_cmd_allowed(self, pcmd: dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: dict[str, Any]) -> bool:
|
|
cmd_name = pcmd.get('cmd')
|
|
if not cmd_name:
|
|
return False
|
|
if not self.function_checkers and not self.command_patterns:
|
|
return True
|
|
for x in self.command_patterns:
|
|
if x.match(cmd_name) is not None:
|
|
return True
|
|
for f in self.function_checkers:
|
|
try:
|
|
ret = f(pcmd, window, from_socket, extra_data)
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
log_error(f'There was an error using a custom RC auth function, blocking the remote command. Error: {e}')
|
|
ret = False
|
|
if ret is not None:
|
|
return ret
|
|
return False
|
|
|
|
|
|
@lru_cache(maxsize=256)
|
|
def password_authorizer(auth_items: frozenset[str]) -> PasswordAuthorizer:
|
|
return PasswordAuthorizer(auth_items)
|
|
|
|
|
|
user_password_allowed: dict[str, bool] = {}
|
|
|
|
|
|
def is_cmd_allowed(pcmd: dict[str, Any], window: Optional['Window'], from_socket: bool, extra_data: dict[str, Any]) -> bool | None:
|
|
sid = pcmd.get('stream_id', '')
|
|
if sid and active_streams.get(sid, '') == pcmd['cmd']:
|
|
return True
|
|
if 'cancel_async' in pcmd and pcmd.get('async_id'):
|
|
# we allow these without authentication as they are sent on error
|
|
# conditions and we can't have users prompted for these. The worst side
|
|
# effect of a malicious cancel_async request is that it can prevent
|
|
# another async request from getting a result, if it knows the async_id
|
|
# of that request.
|
|
return True
|
|
pw = pcmd.get('password', '')
|
|
if not pw:
|
|
auth_items = get_options().remote_control_password.get('')
|
|
if auth_items is None:
|
|
return False
|
|
pa = password_authorizer(auth_items)
|
|
return pa.is_cmd_allowed(pcmd, window, from_socket, extra_data)
|
|
q = user_password_allowed.get(pw)
|
|
if q is not None:
|
|
return q
|
|
auth_items = get_options().remote_control_password.get(pw)
|
|
if auth_items is None:
|
|
return None
|
|
pa = password_authorizer(auth_items)
|
|
return pa.is_cmd_allowed(pcmd, window, from_socket, extra_data)
|
|
|
|
|
|
def set_user_password_allowed(pwd: str, allowed: bool = True) -> None:
|
|
user_password_allowed[pwd] = allowed
|
|
|
|
|
|
def close_active_stream(stream_id: str) -> None:
|
|
active_streams.pop(stream_id, None)
|
|
|
|
|
|
def handle_cmd(
|
|
boss: BossType, window: WindowType | None, cmd: dict[str, Any], peer_id: int, self_window: WindowType | None
|
|
) -> dict[str, Any] | None | AsyncResponse:
|
|
v = cmd['version']
|
|
no_response = cmd.get('no_response', False)
|
|
if tuple(v)[:2] > version[:2]:
|
|
if no_response:
|
|
return None
|
|
return {'ok': False, 'error': 'The kitty client you are using to send remote commands is newer than this kitty instance. This is not supported.'}
|
|
c = command_for_name(cmd['cmd'])
|
|
payload = cmd.get('payload') or {}
|
|
payload['peer_id'] = peer_id
|
|
async_id = str(cmd.get('async', ''))
|
|
stream_id = str(cmd.get('stream_id', ''))
|
|
stream = bool(cmd.get('stream', False))
|
|
if (stream or stream_id) and not c.reads_streaming_data:
|
|
return {'ok': False, 'error': 'Streaming send of data is not supported for this command'}
|
|
if stream_id:
|
|
payload['stream_id'] = stream_id
|
|
active_streams[stream_id] = cmd['cmd']
|
|
if len(active_streams) > 32:
|
|
oldest = next(iter(active_streams))
|
|
del active_streams[oldest]
|
|
if async_id:
|
|
payload['async_id'] = async_id
|
|
if 'cancel_async' in cmd:
|
|
active_async_requests.pop(async_id, None)
|
|
c.cancel_async_request(boss, self_window or window, PayloadGetter(c, payload))
|
|
return None
|
|
active_async_requests[async_id] = monotonic()
|
|
if len(active_async_requests) > 32:
|
|
oldest = next(iter(active_async_requests))
|
|
del active_async_requests[oldest]
|
|
try:
|
|
ans = c.response_from_kitty(boss, self_window or window, PayloadGetter(c, payload))
|
|
except Exception:
|
|
if no_response: # don't report errors if --no-response was used
|
|
return None
|
|
raise
|
|
if isinstance(ans, NoResponse):
|
|
return None
|
|
if isinstance(ans, AsyncResponse):
|
|
if stream:
|
|
return {'ok': True, 'stream': True}
|
|
return ans
|
|
response: dict[str, Any] = {'ok': True}
|
|
if ans is not None:
|
|
response['data'] = ans
|
|
if not no_response:
|
|
return response
|
|
return None
|
|
|
|
|
|
global_options_spec = partial('''\
|
|
--to
|
|
An address for the kitty instance to control. Corresponds to the address given
|
|
to the kitty instance via the :option:`kitty --listen-on` option or the
|
|
:opt:`listen_on` setting in :file:`kitty.conf`. If not specified, the
|
|
environment variable :envvar:`KITTY_LISTEN_ON` is checked. If that is also not
|
|
found, messages are sent to the controlling terminal for this process, i.e.
|
|
they will only work if this process is run within a kitty window.
|
|
|
|
|
|
--password
|
|
A password to use when contacting kitty. This will cause kitty to ask the user
|
|
for permission to perform the specified action, unless the password has been
|
|
accepted before or is pre-configured in :file:`kitty.conf`. To use a blank password
|
|
specify :option:`kitten @ --use-password` as :code:`always`.
|
|
|
|
|
|
--password-file
|
|
completion=type:file relative:conf kwds:-
|
|
default=rc-pass
|
|
A file from which to read the password. Trailing whitespace is ignored. Relative
|
|
paths are resolved from the kitty configuration directory. Use - to read from STDIN.
|
|
Use :code:`fd:num` to read from the file descriptor :code:`num`.
|
|
Used if no :option:`kitten @ --password` is supplied. Defaults to checking for the
|
|
:file:`rc-pass` file in the kitty configuration directory.
|
|
|
|
|
|
--password-env
|
|
default=KITTY_RC_PASSWORD
|
|
The name of an environment variable to read the password from.
|
|
Used if no :option:`kitten @ --password-file` is supplied. Defaults
|
|
to checking the environment variable :envvar:`KITTY_RC_PASSWORD`.
|
|
|
|
|
|
--use-password
|
|
default=if-available
|
|
choices=if-available,never,always
|
|
If no password is available, kitty will usually just send the remote control command
|
|
without a password. This option can be used to force it to :code:`always` or :code:`never` use
|
|
the supplied password. If set to always and no password is provided, the blank password is used.
|
|
'''.format, appname=appname)
|
|
|
|
|
|
def encode_send(send: Any) -> bytes:
|
|
es = ('@kitty-cmd' + json.dumps(send)).encode('ascii')
|
|
return b'\x1bP' + es + b'\x1b\\'
|
|
|
|
|
|
class SocketClosed(EOFError):
|
|
pass
|
|
|
|
|
|
class SocketIO:
|
|
|
|
def __init__(self, to: str):
|
|
self.family, self.address = parse_address_spec(to)[:2]
|
|
|
|
def __enter__(self) -> None:
|
|
import socket
|
|
self.socket = socket.socket(self.family)
|
|
self.socket.setblocking(True)
|
|
self.socket.connect(self.address)
|
|
|
|
def __exit__(self, *a: Any) -> None:
|
|
import socket
|
|
with suppress(OSError): # on some OSes such as macOS the socket is already closed at this point
|
|
self.socket.shutdown(socket.SHUT_RDWR)
|
|
self.socket.close()
|
|
|
|
def send(self, data: bytes | Iterable[str | bytes]) -> None:
|
|
import socket
|
|
with self.socket.makefile('wb') as out:
|
|
if isinstance(data, bytes):
|
|
out.write(data)
|
|
else:
|
|
for chunk in data:
|
|
if isinstance(chunk, str):
|
|
chunk = chunk.encode('utf-8')
|
|
out.write(chunk)
|
|
out.flush()
|
|
self.socket.shutdown(socket.SHUT_WR)
|
|
|
|
def simple_recv(self, timeout: float) -> bytes:
|
|
dcs = re.compile(br'\x1bP@kitty-cmd([^\x1b]+)\x1b\\')
|
|
self.socket.settimeout(timeout)
|
|
st = monotonic()
|
|
with self.socket.makefile('rb') as src:
|
|
data = src.read()
|
|
m = dcs.search(data)
|
|
if m is None:
|
|
if monotonic() - st > timeout:
|
|
raise TimeoutError('Timed out while waiting to read cmd response')
|
|
raise SocketClosed('Remote control connection was closed by kitty without any response being received')
|
|
return bytes(m.group(1))
|
|
|
|
|
|
class RCIO(TTYIO):
|
|
|
|
def simple_recv(self, timeout: float) -> bytes:
|
|
ans: list[bytes] = []
|
|
read_command_response(self.tty_fd, timeout, ans)
|
|
return b''.join(ans)
|
|
|
|
|
|
def do_io(
|
|
to: str | None, original_cmd: dict[str, Any], no_response: bool, response_timeout: float, encrypter: 'CommandEncrypter'
|
|
) -> dict[str, Any]:
|
|
payload = original_cmd.get('payload')
|
|
if not isinstance(payload, GeneratorType):
|
|
send_data: bytes | Iterator[bytes] = encode_send(encrypter(original_cmd))
|
|
else:
|
|
def send_generator() -> Iterator[bytes]:
|
|
assert payload is not None
|
|
for chunk in payload:
|
|
original_cmd['payload'] = chunk
|
|
yield encode_send(encrypter(original_cmd))
|
|
send_data = send_generator()
|
|
|
|
io: SocketIO | RCIO = SocketIO(to) if to else RCIO()
|
|
with io:
|
|
io.send(send_data)
|
|
if no_response:
|
|
return {'ok': True}
|
|
received = io.simple_recv(timeout=response_timeout)
|
|
|
|
return cast(dict[str, Any], json.loads(received.decode('ascii')))
|
|
|
|
|
|
cli_msg = (
|
|
'Control {appname} by sending it commands. Set the'
|
|
' :opt:`allow_remote_control` option in :file:`kitty.conf` or use a password, for this'
|
|
' to work.'
|
|
).format(appname=appname)
|
|
|
|
|
|
def parse_rc_args(args: list[str]) -> tuple[RCOptions, list[str]]:
|
|
cmap = {name: command_for_name(name) for name in sorted(all_command_names())}
|
|
cmds = (f' :green:`{cmd.name}`\n {cmd.short_desc}' for c, cmd in cmap.items())
|
|
msg = cli_msg + (
|
|
'\n\n:title:`Commands`:\n{cmds}\n\n'
|
|
'You can get help for each individual command by using:\n'
|
|
'{appname} @ :italic:`command` -h').format(appname=appname, cmds='\n'.join(cmds))
|
|
return parse_args(args[1:], global_options_spec, 'command ...', msg, f'{appname} @', result_class=RCOptions)
|
|
|
|
|
|
def encode_as_base85(data: bytes) -> str:
|
|
return base64.b85encode(data).decode('ascii')
|
|
|
|
|
|
class CommandEncrypter:
|
|
|
|
encrypts: bool = True
|
|
|
|
def __init__(self, pubkey: bytes, encryption_version: str, password: str) -> None:
|
|
skey = EllipticCurveKey()
|
|
self.secret = skey.derive_secret(pubkey)
|
|
self.pubkey = skey.public
|
|
self.encryption_version = encryption_version
|
|
self.password = password
|
|
|
|
def __call__(self, cmd: dict[str, Any]) -> dict[str, Any]:
|
|
encrypter = AES256GCMEncrypt(self.secret)
|
|
cmd['timestamp'] = time_ns()
|
|
cmd['password'] = self.password
|
|
raw = json.dumps(cmd).encode('utf-8')
|
|
encrypted = encrypter.add_data_to_be_encrypted(raw, True)
|
|
ans = {
|
|
'version': version, 'iv': encode_as_base85(encrypter.iv), 'tag': encode_as_base85(encrypter.tag),
|
|
'pubkey': encode_as_base85(self.pubkey), 'encrypted': encode_as_base85(encrypted),
|
|
}
|
|
if self.encryption_version != '1':
|
|
ans['enc_proto'] = self.encryption_version
|
|
return ans
|
|
|
|
def adjust_response_timeout_for_password(self, response_timeout: float) -> float:
|
|
return max(response_timeout, 120)
|
|
|
|
|
|
class NoEncryption(CommandEncrypter):
|
|
|
|
encrypts: bool = False
|
|
|
|
def __init__(self) -> None: ...
|
|
|
|
def __call__(self, cmd: dict[str, Any]) -> dict[str, Any]:
|
|
return cmd
|
|
|
|
def adjust_response_timeout_for_password(self, response_timeout: float) -> float:
|
|
return response_timeout
|
|
|
|
|
|
def create_basic_command(name: str, payload: Any = None, no_response: bool = False, is_asynchronous: bool = False) -> dict[str, Any]:
|
|
ans = {'cmd': name, 'version': version, 'no_response': no_response}
|
|
if payload is not None:
|
|
ans['payload'] = payload
|
|
if is_asynchronous:
|
|
from kitty.short_uuid import uuid4
|
|
ans['async'] = uuid4()
|
|
return ans
|
|
|
|
|
|
def send_response_to_client(data: Any = None, error: str = '', peer_id: int = 0, window_id: int = 0, async_id: str = '') -> None:
|
|
if active_async_requests.pop(async_id, None) is None:
|
|
return
|
|
if error:
|
|
response: dict[str, bool | int | str] = {'ok': False, 'error': error}
|
|
else:
|
|
response = {'ok': True, 'data': data}
|
|
if peer_id > 0:
|
|
send_data_to_peer(peer_id, encode_response_for_peer(response))
|
|
elif window_id > 0:
|
|
w = get_boss().window_id_map.get(window_id)
|
|
if w is not None:
|
|
w.send_cmd_response(response)
|
|
|
|
|
|
def get_password(opts: RCOptions) -> str:
|
|
if opts.use_password == 'never':
|
|
return ''
|
|
ans = ''
|
|
if opts.password:
|
|
ans = opts.password
|
|
if not ans and opts.password_file:
|
|
if opts.password_file == '-':
|
|
if sys.stdin.isatty():
|
|
from getpass import getpass
|
|
ans = getpass()
|
|
else:
|
|
ans = sys.stdin.read().rstrip()
|
|
try:
|
|
tty_fd = os.open(os.ctermid(), os.O_RDONLY | os.O_CLOEXEC)
|
|
except OSError:
|
|
pass
|
|
else:
|
|
with open(tty_fd, closefd=True):
|
|
os.dup2(tty_fd, sys.stdin.fileno())
|
|
else:
|
|
try:
|
|
with open(resolve_custom_file(opts.password_file)) as f:
|
|
ans = f.read().rstrip()
|
|
except OSError:
|
|
pass
|
|
if not ans and opts.password_env:
|
|
ans = os.environ.get(opts.password_env, '')
|
|
if not ans and opts.use_password == 'always':
|
|
raise SystemExit('No password was found')
|
|
if ans and len(ans) > 1024:
|
|
raise SystemExit('Specified password is too long')
|
|
return ans
|
|
|
|
|
|
def get_pubkey() -> tuple[str, bytes]:
|
|
raw = os.environ.get('KITTY_PUBLIC_KEY', '')
|
|
if not raw:
|
|
raise SystemExit('Password usage requested but KITTY_PUBLIC_KEY environment variable is not available')
|
|
version, pubkey = raw.split(':', 1)
|
|
if version != RC_ENCRYPTION_PROTOCOL_VERSION:
|
|
raise SystemExit('KITTY_PUBLIC_KEY has unknown version, if you are running on a remote system, update kitty on this system')
|
|
from base64 import b85decode
|
|
return version, b85decode(pubkey)
|