Files
swift-mirror/utils/remote-run
Justice Adams 444fcbc528 Update remote-run (#64880)
replace encoding errors when using utf-8
2023-04-04 12:01:50 -07:00

474 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
# remote-run - Runs a command on another machine, for testing -----*- python -*-
#
# This source file is part of the Swift.org open source project
#
# Copyright (c) 2018 Apple Inc. and the Swift project authors
# Licensed under Apache License v2.0 with Runtime Library Exception
#
# See https://swift.org/LICENSE.txt for license information
# See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
#
# ----------------------------------------------------------------------------
import argparse
import os
import posixpath
import subprocess
import sys
import shutil
def quote(arg):
return repr(arg)
class CommandRunner(object):
def __init__(self):
self.verbose = False
self.dry_run = False
self.ignore_rsync_failure = False
@staticmethod
def _dirnames(files):
return sorted(set(posixpath.dirname(f) for f in files))
def popen(self, command, **kwargs):
if self.verbose:
print(' '.join(command), file=sys.stderr)
if self.dry_run:
return None
return subprocess.Popen(command, **kwargs)
def mkdirs_remote(self, directories):
if directories:
mkdir_command = ['/bin/mkdir', '-p'] + directories
self.run_remote(mkdir_command)
def send(self, input_prefix, remote_prefix, local_to_remote_files):
# Prepare the remote directory structure.
self.mkdirs_remote([remote_prefix])
self.run_rsync_to(input_prefix, local_to_remote_files, remote_prefix)
def fetch(self, output_prefix, remote_prefix, remote_to_local_files):
# Prepare the local directory structure.
mkdir_command = ['/bin/mkdir', '-p', output_prefix]
if self.verbose:
print(' '.join(mkdir_command), file=sys.stderr)
if not self.dry_run:
subprocess.check_call(mkdir_command)
self.run_rsync_from(remote_prefix, remote_to_local_files, output_prefix)
# Recover from random and transient errors that occur when SSHing to devices, in particular devicecompute
@staticmethod
def should_remote_command_retry(stderr):
if "banner line contains invalid characters" in stderr:
return True
if "Connection to localhost closed by remote host" in stderr:
return True
if "kex_exchange_identification: Connection closed by remote host" in stderr:
return True
if "rsync error: unexplained error" in stderr:
return True
# Fallthrough. The error is not known and shouldn't be retried
return False
def run_remote(self, command, remote_env={}):
env_strings = ['{0}={1}'.format(k,v) for k,v in sorted(remote_env.items())]
remote_invocation = self.remote_invocation(
['/usr/bin/env'] + env_strings + command)
attempt = 1
remote_proc = None
while attempt <= 3:
remote_proc = self.popen(
remote_invocation,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
if self.dry_run:
return
stdout, stderr = remote_proc.communicate()
stdout = stdout.decode(encoding='utf-8', errors='replace')
stderr = stderr.decode(encoding='utf-8', errors='replace')
# Print stdout to screen
print(stdout, end='')
# This is a transient and random error, and if this occurs, we should simply retry our ssh command.
if self.should_remote_command_retry(stderr):
attempt += 1
continue
print(stderr, end='', file=sys.stderr)
# Process error code
if remote_proc.returncode:
# FIXME: We may still want to fetch the output files to see what
# went wrong.
sys.exit(remote_proc.returncode)
else:
# Nothing went wrong. Return
return
if attempt > 3 and remote_proc is not None:
sys.exit(remote_proc.returncode)
def run_rsync(self, invocation, sources):
attempt = 1
rsync_proc = None
while attempt <= 3:
rsync_proc = self.popen(
invocation,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
if self.dry_run:
return
sources = '\n'.join(sources)
if self.verbose:
print(sources, file=sys.stderr)
stdout, stderr = rsync_proc.communicate(sources.encode('utf-8'))
stdout = stdout.decode(encoding='utf-8', errors='replace')
stderr = stderr.decode(encoding='utf-8', errors='replace')
# Print stdout to screen
print(stdout, end='')
# This is a transient and random error, and if this occurs, we should simply retry our ssh command.
if self.should_remote_command_retry(stderr):
attempt += 1
continue
print(stderr, end='', file=sys.stderr)
# Process error code
if rsync_proc.returncode:
if self.ignore_rsync_failure:
return
else:
sys.exit(rsync_proc.returncode)
else:
# Nothing went wrong. Return
return
if attempt > 3 and rsync_proc is not None:
sys.exit(rsync_proc.returncode)
def run_rsync_to(self, prefix, sources, dest):
self.run_rsync(self.rsync_to_invocation(prefix, dest), sources)
def run_rsync_from(self, prefix, sources, dest):
self.run_rsync(self.rsync_from_invocation(prefix, dest), sources)
class RemoteCommandRunner(CommandRunner):
def __init__(self, host, identity_path, ssh_options, config_file):
if ':' in host:
(self.remote_host, self.port) = host.rsplit(':', 1)
else:
self.remote_host = host
self.port = None
self.identity_path = identity_path
self.ssh_options = ssh_options
self.config_file = config_file
def common_options(self, port_flag):
port_option = [port_flag, self.port] if self.port else []
config_option = ['-F', self.config_file] if self.config_file else []
identity_option = (
['-i', self.identity_path] if self.identity_path else [])
# Interleave '-o' with each custom option.
# From https://stackoverflow.com/a/8168526,
# with explanatory help from
# https://spapas.github.io/2016/04/27/python-nested-list-comprehensions/
extra_options = [arg for option in self.ssh_options
for arg in ["-o", option]]
return port_option + identity_option + config_option + extra_options
def remote_invocation(self, command):
return (['/usr/bin/ssh', '-n'] +
self.common_options(port_flag='-p') +
[self.remote_host, '--'] +
[quote(arg) for arg in command])
def rsync_invocation(self, source, dest):
return ['/usr/bin/rsync', '-arRz', '--files-from=-',
'-e', ' '.join([quote(x) for x in
['/usr/bin/ssh'] +
self.common_options(port_flag='-p')]),
source,
dest]
def rsync_from_invocation(self, source, dest):
return self.rsync_invocation(self.remote_host + ':' + source, dest)
def rsync_to_invocation(self, source, dest):
return self.rsync_invocation(source, self.remote_host + ':' + dest)
class LocalCommandRunner(CommandRunner):
def remote_invocation(self, command):
return command
def rsync_invocation(self, source, dest):
return ['/usr/bin/rsync', '-arz', '--files-from=-', source, dest]
def rsync_from_invocation(self, source, dest):
return self.rsync_invocation(source, dest)
def rsync_to_invocation(self, source, dest):
return self.rsync_invocation(source, dest)
def strip_sep(name):
lsep = len(posixpath.sep)
while name.startswith(posixpath.sep):
name = name[lsep:]
return name
class RemotePathSet(object):
def __init__(self):
self.inputs = set()
self.nodir_inputs = set()
self.existing_outputs = set()
self.outputs = set()
self.existing_nodir_outputs = set()
self.nodir_outputs = set()
class PrefixProcessor(object):
def __init__(self,
input_prefix, output_prefix,
remote_dir, remote_input_prefix,
remote_output_prefix,
path_set):
assert not remote_input_prefix.startswith('..')
assert not remote_output_prefix.startswith('..')
self.input_prefix = input_prefix
self.output_prefix = output_prefix
self.remote_dir = remote_dir
self.remote_input_prefix = remote_input_prefix
self.remote_output_prefix = remote_output_prefix
self.path_set = path_set
if self.input_prefix:
while self.input_prefix.endswith(posixpath.sep):
self.input_prefix = self.input_prefix[:-len(posixpath.sep)]
if self.output_prefix:
while self.output_prefix.endswith(posixpath.sep):
self.output_prefix = self.output_prefix[:-len(posixpath.sep)]
if self.input_prefix:
split = posixpath.split(self.input_prefix)
self.input_prefix_split = (len(self.input_prefix), split[0], split[1])
else:
self.input_prefix_split = (0, '', '')
if self.output_prefix:
split = posixpath.split(self.output_prefix)
self.output_prefix_split = (len(self.output_prefix),
split[0],
split[1])
else:
self.output_prefix_split = (0, '', '')
def process_one(self, orig_name):
iplen, ipfxdir, ipfx = self.input_prefix_split
oplen, opfxdir, opfx = self.output_prefix_split
if iplen and orig_name.startswith(self.input_prefix):
name = orig_name[iplen:]
if not name.startswith(posixpath.sep):
name = ipfx + name
self.path_set.nodir_inputs.add(name)
else:
name = strip_sep(name)
self.path_set.inputs.add(name)
return posixpath.join(self.remote_dir,
self.remote_input_prefix,
name)
if oplen and orig_name.startswith(self.output_prefix):
name = orig_name[oplen:]
if not name.startswith(posixpath.sep):
name = opfx + name
self.path_set.nodir_outputs.add(name)
if os.path.exists(orig_name):
self.path_set.existing_nodir_outputs.add(name)
else:
name = strip_sep(name)
self.path_set.outputs.add(name)
if os.path.exists(orig_name):
self.path_set.existing_outputs.add(name)
return posixpath.join(self.remote_dir,
self.remote_output_prefix,
name)
return orig_name
class ArgumentProcessor(PrefixProcessor):
def __init__(self,
input_prefix, output_prefix,
remote_dir, remote_input_prefix, remote_output_prefix,
path_set, arguments):
super().__init__(input_prefix, output_prefix,
remote_dir, remote_input_prefix, remote_output_prefix,
path_set)
self.original_args = arguments
def process_args(self):
self.args = []
for arg in self.original_args:
self.args.append(self.process_one(arg))
class EnvVarProcessor(PrefixProcessor):
def __init__(self,
input_prefix, output_prefix,
remote_dir, remote_input_prefix, remote_output_prefix,
path_set, environment):
super().__init__(input_prefix, output_prefix,
remote_dir, remote_input_prefix, remote_output_prefix,
path_set)
self.original_env = environment
def process_env(self):
self.env = dict()
for key, value in self.original_env.items():
self.env[key] = self.process_one(value)
def collect_remote_env(local_env=os.environ, prefix='REMOTE_RUN_CHILD_'):
return dict((key[len(prefix):], value)
for key, value in local_env.items() if key.startswith(prefix))
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true', dest='verbose',
help='print commands as they are run')
parser.add_argument('-n', '--dry-run', action='store_true', dest='dry_run',
help="print the commands that would have been run, but "
"don't actually run them")
parser.add_argument('--remote-dir', required=True, metavar='PATH',
help='(required) a writable temporary path on the '
'remote machine')
parser.add_argument('--input-prefix',
help='arguments matching this prefix will be uploaded')
parser.add_argument('--output-prefix',
help='arguments matching this prefix will be both '
'uploaded and downloaded')
parser.add_argument('--remote-input-prefix', default='input',
help='input arguments use this prefix on the remote '
'machine')
parser.add_argument('--remote-output-prefix', default='output',
help='output arguments use this prefix on the remote '
'machine')
parser.add_argument('-i', '--identity', dest='identity', metavar='FILE',
help='an SSH identity file (private key) to use')
parser.add_argument('-F', '--config-file', dest='config_file', metavar='FILE',
help='an SSH configuration file')
parser.add_argument('-o', '--ssh-option', action='append', default=[],
dest='ssh_options', metavar='OPTION',
help='extra SSH config options (man ssh_config)')
parser.add_argument('--debug-as-local', action='store_true',
help='run commands locally instead of over SSH, for '
'debugging purposes. The "host" argument is '
'omitted.')
parser.add_argument('--ignore-rsync-failure', action='store_true',
help='ignore rsync failures, for debugging.')
parser.add_argument('host',
help='the host to connect to, in the form '
'[user@]host[:port]')
parser.add_argument('command', nargs=argparse.REMAINDER,
help='the command to run', metavar='command...')
args = parser.parse_args()
if args.debug_as_local:
runner = LocalCommandRunner()
args.command.insert(0, args.host)
del args.host
else:
runner = RemoteCommandRunner(args.host,
args.identity,
args.ssh_options,
args.config_file)
runner.dry_run = args.dry_run
runner.verbose = args.verbose or args.dry_run
runner.ignore_rsync_failure = args.ignore_rsync_failure
assert not args.remote_dir == '/'
path_set = RemotePathSet()
argproc = ArgumentProcessor(args.input_prefix,
args.output_prefix,
args.remote_dir,
posixpath.normpath(args.remote_input_prefix),
posixpath.normpath(args.remote_output_prefix),
path_set,
args.command)
argproc.process_args()
envproc = EnvVarProcessor(args.input_prefix,
args.output_prefix,
args.remote_dir,
posixpath.normpath(args.remote_input_prefix),
posixpath.normpath(args.remote_output_prefix),
path_set,
collect_remote_env())
envproc.process_env()
input_dir = posixpath.join(args.remote_dir, args.remote_input_prefix)
output_dir = posixpath.join(args.remote_dir, args.remote_output_prefix)
if args.output_prefix:
runner.run_remote(['/bin/rm', '-rf', output_dir])
dirs = set()
for output in path_set.outputs:
dirs.add(posixpath.join(output_dir, posixpath.dirname(output)))
for output in path_set.nodir_outputs:
dirs.add(posixpath.join(output_dir, posixpath.dirname(output)))
runner.mkdirs_remote(list(dirs))
if path_set.inputs:
runner.send(args.input_prefix, input_dir, path_set.inputs)
if path_set.nodir_inputs:
runner.send(posixpath.dirname(args.input_prefix),
input_dir, path_set.nodir_inputs)
if path_set.existing_outputs:
runner.send(args.output_prefix, output_dir, path_set.existing_outputs)
if path_set.existing_nodir_outputs:
runner.send(posixpath.dirname(args.output_prefix),
output_dir, path_set.existing_nodir_outputs)
runner.run_remote(argproc.args, envproc.env)
if path_set.outputs:
runner.fetch(args.output_prefix, output_dir, path_set.outputs)
if path_set.nodir_outputs:
runner.fetch(posixpath.dirname(args.output_prefix),
output_dir, path_set.nodir_outputs)
if __name__ == "__main__":
main()