mirror of
https://github.com/apple/swift.git
synced 2025-12-14 20:36:38 +01:00
The current remote-run implementation only works if rsync is located at `/usr/bin/`, which isn't always the case. FreeBSD installs rsync to `/usr/local/bin` as it is not installed in the base system by default. Relax the script to accept rsync on any path.
474 lines
18 KiB
Python
Executable File
474 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# remote-run - Runs a command on another machine, for testing -----*- python -*-
|
|
#
|
|
# This source file is part of the Swift.org open source project
|
|
#
|
|
# Copyright (c) 2018 Apple Inc. and the Swift project authors
|
|
# Licensed under Apache License v2.0 with Runtime Library Exception
|
|
#
|
|
# See https://swift.org/LICENSE.txt for license information
|
|
# See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
|
|
#
|
|
# ----------------------------------------------------------------------------
|
|
|
|
import argparse
|
|
import os
|
|
import posixpath
|
|
import subprocess
|
|
import sys
|
|
import shutil
|
|
|
|
def quote(arg):
|
|
return repr(arg)
|
|
|
|
class CommandRunner(object):
|
|
def __init__(self):
|
|
self.verbose = False
|
|
self.dry_run = False
|
|
self.ignore_rsync_failure = False
|
|
|
|
@staticmethod
|
|
def _dirnames(files):
|
|
return sorted(set(posixpath.dirname(f) for f in files))
|
|
|
|
def popen(self, command, **kwargs):
|
|
if self.verbose:
|
|
print(' '.join(command), file=sys.stderr)
|
|
if self.dry_run:
|
|
return None
|
|
return subprocess.Popen(command, **kwargs)
|
|
|
|
def mkdirs_remote(self, directories):
|
|
if directories:
|
|
mkdir_command = ['/bin/mkdir', '-p'] + directories
|
|
self.run_remote(mkdir_command)
|
|
|
|
def send(self, input_prefix, remote_prefix, local_to_remote_files):
|
|
# Prepare the remote directory structure.
|
|
self.mkdirs_remote([remote_prefix])
|
|
|
|
self.run_rsync_to(input_prefix, local_to_remote_files, remote_prefix)
|
|
|
|
def fetch(self, output_prefix, remote_prefix, remote_to_local_files):
|
|
# Prepare the local directory structure.
|
|
mkdir_command = ['/bin/mkdir', '-p', output_prefix]
|
|
if self.verbose:
|
|
print(' '.join(mkdir_command), file=sys.stderr)
|
|
if not self.dry_run:
|
|
subprocess.check_call(mkdir_command)
|
|
|
|
self.run_rsync_from(remote_prefix, remote_to_local_files, output_prefix)
|
|
|
|
# Recover from random and transient errors that occur when SSHing to devices, in particular devicecompute
|
|
@staticmethod
|
|
def should_remote_command_retry(stderr):
|
|
if "banner line contains invalid characters" in stderr:
|
|
return True
|
|
|
|
if "Connection to localhost closed by remote host" in stderr:
|
|
return True
|
|
|
|
if "kex_exchange_identification: Connection closed by remote host" in stderr:
|
|
return True
|
|
|
|
if "rsync error: unexplained error" in stderr:
|
|
return True
|
|
|
|
# Fallthrough. The error is not known and shouldn't be retried
|
|
return False
|
|
|
|
def run_remote(self, command, remote_env={}):
|
|
env_strings = ['{0}={1}'.format(k,v) for k,v in sorted(remote_env.items())]
|
|
remote_invocation = self.remote_invocation(
|
|
['/usr/bin/env'] + env_strings + command)
|
|
attempt = 1
|
|
remote_proc = None
|
|
|
|
while attempt <= 3:
|
|
remote_proc = self.popen(
|
|
remote_invocation,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
|
|
if self.dry_run:
|
|
return
|
|
|
|
stdout, stderr = remote_proc.communicate()
|
|
stdout = stdout.decode(encoding='utf-8', errors='replace')
|
|
stderr = stderr.decode(encoding='utf-8', errors='replace')
|
|
|
|
# Print stdout to screen
|
|
print(stdout, end='')
|
|
|
|
# This is a transient and random error, and if this occurs, we should simply retry our ssh command.
|
|
if self.should_remote_command_retry(stderr):
|
|
attempt += 1
|
|
continue
|
|
|
|
print(stderr, end='', file=sys.stderr)
|
|
|
|
# Process error code
|
|
if remote_proc.returncode:
|
|
# FIXME: We may still want to fetch the output files to see what
|
|
# went wrong.
|
|
sys.exit(remote_proc.returncode)
|
|
else:
|
|
# Nothing went wrong. Return
|
|
return
|
|
|
|
if attempt > 3 and remote_proc is not None:
|
|
sys.exit(remote_proc.returncode)
|
|
|
|
def run_rsync(self, invocation, sources):
|
|
attempt = 1
|
|
rsync_proc = None
|
|
|
|
while attempt <= 3:
|
|
rsync_proc = self.popen(
|
|
invocation,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
|
|
if self.dry_run:
|
|
return
|
|
|
|
sources = '\n'.join(sources)
|
|
if self.verbose:
|
|
print(sources, file=sys.stderr)
|
|
stdout, stderr = rsync_proc.communicate(sources.encode('utf-8'))
|
|
stdout = stdout.decode(encoding='utf-8', errors='replace')
|
|
stderr = stderr.decode(encoding='utf-8', errors='replace')
|
|
|
|
# Print stdout to screen
|
|
print(stdout, end='')
|
|
|
|
# This is a transient and random error, and if this occurs, we should simply retry our ssh command.
|
|
if self.should_remote_command_retry(stderr):
|
|
attempt += 1
|
|
continue
|
|
|
|
print(stderr, end='', file=sys.stderr)
|
|
|
|
# Process error code
|
|
if rsync_proc.returncode:
|
|
if self.ignore_rsync_failure:
|
|
return
|
|
else:
|
|
sys.exit(rsync_proc.returncode)
|
|
else:
|
|
# Nothing went wrong. Return
|
|
return
|
|
|
|
if attempt > 3 and rsync_proc is not None:
|
|
sys.exit(rsync_proc.returncode)
|
|
|
|
def run_rsync_to(self, prefix, sources, dest):
|
|
self.run_rsync(self.rsync_to_invocation(prefix, dest), sources)
|
|
|
|
def run_rsync_from(self, prefix, sources, dest):
|
|
self.run_rsync(self.rsync_from_invocation(prefix, dest), sources)
|
|
|
|
class RemoteCommandRunner(CommandRunner):
|
|
def __init__(self, host, identity_path, ssh_options, config_file):
|
|
if ':' in host:
|
|
(self.remote_host, self.port) = host.rsplit(':', 1)
|
|
else:
|
|
self.remote_host = host
|
|
self.port = None
|
|
self.identity_path = identity_path
|
|
self.ssh_options = ssh_options
|
|
self.config_file = config_file
|
|
|
|
def common_options(self, port_flag):
|
|
port_option = [port_flag, self.port] if self.port else []
|
|
config_option = ['-F', self.config_file] if self.config_file else []
|
|
identity_option = (
|
|
['-i', self.identity_path] if self.identity_path else [])
|
|
# Interleave '-o' with each custom option.
|
|
# From https://stackoverflow.com/a/8168526,
|
|
# with explanatory help from
|
|
# https://spapas.github.io/2016/04/27/python-nested-list-comprehensions/
|
|
extra_options = [arg for option in self.ssh_options
|
|
for arg in ["-o", option]]
|
|
return port_option + identity_option + config_option + extra_options
|
|
|
|
def remote_invocation(self, command):
|
|
return (['/usr/bin/ssh', '-n'] +
|
|
self.common_options(port_flag='-p') +
|
|
[self.remote_host, '--'] +
|
|
[quote(arg) for arg in command])
|
|
|
|
def rsync_invocation(self, source, dest):
|
|
return ['rsync', '-arRz', '--files-from=-',
|
|
'-e', ' '.join([quote(x) for x in
|
|
['/usr/bin/ssh'] +
|
|
self.common_options(port_flag='-p')]),
|
|
source,
|
|
dest]
|
|
|
|
def rsync_from_invocation(self, source, dest):
|
|
return self.rsync_invocation(self.remote_host + ':' + source, dest)
|
|
|
|
def rsync_to_invocation(self, source, dest):
|
|
return self.rsync_invocation(source, self.remote_host + ':' + dest)
|
|
|
|
class LocalCommandRunner(CommandRunner):
|
|
def remote_invocation(self, command):
|
|
return command
|
|
|
|
def rsync_invocation(self, source, dest):
|
|
return ['rsync', '-arz', '--files-from=-', source, dest]
|
|
|
|
def rsync_from_invocation(self, source, dest):
|
|
return self.rsync_invocation(source, dest)
|
|
|
|
def rsync_to_invocation(self, source, dest):
|
|
return self.rsync_invocation(source, dest)
|
|
|
|
def strip_sep(name):
|
|
lsep = len(posixpath.sep)
|
|
while name.startswith(posixpath.sep):
|
|
name = name[lsep:]
|
|
return name
|
|
|
|
class RemotePathSet(object):
|
|
def __init__(self):
|
|
self.inputs = set()
|
|
self.nodir_inputs = set()
|
|
self.existing_outputs = set()
|
|
self.outputs = set()
|
|
self.existing_nodir_outputs = set()
|
|
self.nodir_outputs = set()
|
|
|
|
class PrefixProcessor(object):
|
|
def __init__(self,
|
|
input_prefix, output_prefix,
|
|
remote_dir, remote_input_prefix,
|
|
remote_output_prefix,
|
|
path_set):
|
|
assert not remote_input_prefix.startswith('..')
|
|
assert not remote_output_prefix.startswith('..')
|
|
|
|
self.input_prefix = input_prefix
|
|
self.output_prefix = output_prefix
|
|
self.remote_dir = remote_dir
|
|
self.remote_input_prefix = remote_input_prefix
|
|
self.remote_output_prefix = remote_output_prefix
|
|
self.path_set = path_set
|
|
|
|
if self.input_prefix:
|
|
while self.input_prefix.endswith(posixpath.sep):
|
|
self.input_prefix = self.input_prefix[:-len(posixpath.sep)]
|
|
if self.output_prefix:
|
|
while self.output_prefix.endswith(posixpath.sep):
|
|
self.output_prefix = self.output_prefix[:-len(posixpath.sep)]
|
|
|
|
if self.input_prefix:
|
|
split = posixpath.split(self.input_prefix)
|
|
self.input_prefix_split = (len(self.input_prefix), split[0], split[1])
|
|
else:
|
|
self.input_prefix_split = (0, '', '')
|
|
|
|
if self.output_prefix:
|
|
split = posixpath.split(self.output_prefix)
|
|
self.output_prefix_split = (len(self.output_prefix),
|
|
split[0],
|
|
split[1])
|
|
else:
|
|
self.output_prefix_split = (0, '', '')
|
|
|
|
def process_one(self, orig_name):
|
|
iplen, ipfxdir, ipfx = self.input_prefix_split
|
|
oplen, opfxdir, opfx = self.output_prefix_split
|
|
|
|
if iplen and orig_name.startswith(self.input_prefix):
|
|
name = orig_name[iplen:]
|
|
if not name.startswith(posixpath.sep):
|
|
name = ipfx + name
|
|
self.path_set.nodir_inputs.add(name)
|
|
else:
|
|
name = strip_sep(name)
|
|
self.path_set.inputs.add(name)
|
|
return posixpath.join(self.remote_dir,
|
|
self.remote_input_prefix,
|
|
name)
|
|
|
|
if oplen and orig_name.startswith(self.output_prefix):
|
|
name = orig_name[oplen:]
|
|
if not name.startswith(posixpath.sep):
|
|
name = opfx + name
|
|
self.path_set.nodir_outputs.add(name)
|
|
if os.path.exists(orig_name):
|
|
self.path_set.existing_nodir_outputs.add(name)
|
|
else:
|
|
name = strip_sep(name)
|
|
self.path_set.outputs.add(name)
|
|
if os.path.exists(orig_name):
|
|
self.path_set.existing_outputs.add(name)
|
|
return posixpath.join(self.remote_dir,
|
|
self.remote_output_prefix,
|
|
name)
|
|
|
|
return orig_name
|
|
|
|
class ArgumentProcessor(PrefixProcessor):
|
|
def __init__(self,
|
|
input_prefix, output_prefix,
|
|
remote_dir, remote_input_prefix, remote_output_prefix,
|
|
path_set, arguments):
|
|
super().__init__(input_prefix, output_prefix,
|
|
remote_dir, remote_input_prefix, remote_output_prefix,
|
|
path_set)
|
|
self.original_args = arguments
|
|
|
|
def process_args(self):
|
|
self.args = []
|
|
|
|
for arg in self.original_args:
|
|
self.args.append(self.process_one(arg))
|
|
|
|
class EnvVarProcessor(PrefixProcessor):
|
|
def __init__(self,
|
|
input_prefix, output_prefix,
|
|
remote_dir, remote_input_prefix, remote_output_prefix,
|
|
path_set, environment):
|
|
super().__init__(input_prefix, output_prefix,
|
|
remote_dir, remote_input_prefix, remote_output_prefix,
|
|
path_set)
|
|
self.original_env = environment
|
|
|
|
def process_env(self):
|
|
self.env = dict()
|
|
|
|
for key, value in self.original_env.items():
|
|
self.env[key] = self.process_one(value)
|
|
|
|
def collect_remote_env(local_env=os.environ, prefix='REMOTE_RUN_CHILD_'):
|
|
return dict((key[len(prefix):], value)
|
|
for key, value in local_env.items() if key.startswith(prefix))
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument('-v', '--verbose', action='store_true', dest='verbose',
|
|
help='print commands as they are run')
|
|
parser.add_argument('-n', '--dry-run', action='store_true', dest='dry_run',
|
|
help="print the commands that would have been run, but "
|
|
"don't actually run them")
|
|
|
|
parser.add_argument('--remote-dir', required=True, metavar='PATH',
|
|
help='(required) a writable temporary path on the '
|
|
'remote machine')
|
|
parser.add_argument('--input-prefix',
|
|
help='arguments matching this prefix will be uploaded')
|
|
parser.add_argument('--output-prefix',
|
|
help='arguments matching this prefix will be both '
|
|
'uploaded and downloaded')
|
|
parser.add_argument('--remote-input-prefix', default='input',
|
|
help='input arguments use this prefix on the remote '
|
|
'machine')
|
|
parser.add_argument('--remote-output-prefix', default='output',
|
|
help='output arguments use this prefix on the remote '
|
|
'machine')
|
|
|
|
parser.add_argument('-i', '--identity', dest='identity', metavar='FILE',
|
|
help='an SSH identity file (private key) to use')
|
|
parser.add_argument('-F', '--config-file', dest='config_file', metavar='FILE',
|
|
help='an SSH configuration file')
|
|
parser.add_argument('-o', '--ssh-option', action='append', default=[],
|
|
dest='ssh_options', metavar='OPTION',
|
|
help='extra SSH config options (man ssh_config)')
|
|
parser.add_argument('--debug-as-local', action='store_true',
|
|
help='run commands locally instead of over SSH, for '
|
|
'debugging purposes. The "host" argument is '
|
|
'omitted.')
|
|
parser.add_argument('--ignore-rsync-failure', action='store_true',
|
|
help='ignore rsync failures, for debugging.')
|
|
|
|
parser.add_argument('host',
|
|
help='the host to connect to, in the form '
|
|
'[user@]host[:port]')
|
|
parser.add_argument('command', nargs=argparse.REMAINDER,
|
|
help='the command to run', metavar='command...')
|
|
args = parser.parse_args()
|
|
|
|
if args.debug_as_local:
|
|
runner = LocalCommandRunner()
|
|
args.command.insert(0, args.host)
|
|
del args.host
|
|
else:
|
|
runner = RemoteCommandRunner(args.host,
|
|
args.identity,
|
|
args.ssh_options,
|
|
args.config_file)
|
|
runner.dry_run = args.dry_run
|
|
runner.verbose = args.verbose or args.dry_run
|
|
runner.ignore_rsync_failure = args.ignore_rsync_failure
|
|
|
|
assert not args.remote_dir == '/'
|
|
|
|
path_set = RemotePathSet()
|
|
|
|
argproc = ArgumentProcessor(args.input_prefix,
|
|
args.output_prefix,
|
|
args.remote_dir,
|
|
posixpath.normpath(args.remote_input_prefix),
|
|
posixpath.normpath(args.remote_output_prefix),
|
|
path_set,
|
|
args.command)
|
|
|
|
argproc.process_args()
|
|
|
|
envproc = EnvVarProcessor(args.input_prefix,
|
|
args.output_prefix,
|
|
args.remote_dir,
|
|
posixpath.normpath(args.remote_input_prefix),
|
|
posixpath.normpath(args.remote_output_prefix),
|
|
path_set,
|
|
collect_remote_env())
|
|
|
|
envproc.process_env()
|
|
|
|
input_dir = posixpath.join(args.remote_dir, args.remote_input_prefix)
|
|
output_dir = posixpath.join(args.remote_dir, args.remote_output_prefix)
|
|
|
|
if args.output_prefix:
|
|
runner.run_remote(['/bin/rm', '-rf', output_dir])
|
|
|
|
dirs = set()
|
|
for output in path_set.outputs:
|
|
dirs.add(posixpath.join(output_dir, posixpath.dirname(output)))
|
|
for output in path_set.nodir_outputs:
|
|
dirs.add(posixpath.join(output_dir, posixpath.dirname(output)))
|
|
runner.mkdirs_remote(list(dirs))
|
|
|
|
if path_set.inputs:
|
|
runner.send(args.input_prefix, input_dir, path_set.inputs)
|
|
|
|
if path_set.nodir_inputs:
|
|
runner.send(posixpath.dirname(args.input_prefix),
|
|
input_dir, path_set.nodir_inputs)
|
|
|
|
if path_set.existing_outputs:
|
|
runner.send(args.output_prefix, output_dir, path_set.existing_outputs)
|
|
|
|
if path_set.existing_nodir_outputs:
|
|
runner.send(posixpath.dirname(args.output_prefix),
|
|
output_dir, path_set.existing_nodir_outputs)
|
|
|
|
runner.run_remote(argproc.args, envproc.env)
|
|
|
|
if path_set.outputs:
|
|
runner.fetch(args.output_prefix, output_dir, path_set.outputs)
|
|
|
|
if path_set.nodir_outputs:
|
|
runner.fetch(posixpath.dirname(args.output_prefix),
|
|
output_dir, path_set.nodir_outputs)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|