#!/usr/bin/env python3 # ===--- Benchmark_RuntimeLeaksRunner.in ---------------------------------===// # # This source file is part of the Swift.org open source project # # Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors # Licensed under Apache License v2.0 with Runtime Library Exception # # See https://swift.org/LICENSE.txt for license information # See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors # # ===---------------------------------------------------------------------===// import json import os import subprocess import sys sys.path.append("@PATH_TO_DRIVER_LIBRARY@") import perf_test_driver # noqa (E402 module level import not at top of file) # This is a hacked up XFAIL list. It should really be a json file, but it will # work for now. Add in the exact name of the pass to XFAIL. XFAIL_LIST = [] # Global Objective-C classes created by various frameworks. We do not care # about these. IGNORABLE_GLOBAL_OBJC_CLASSES = set( [ "__NSPlaceholderDate", "NSCache", "__NSPlaceholderTimeZone", "NSPlaceholderNumber", "NSPlaceholderString", "__NSPlaceholderArray", "__NSPlaceholderDictionary", "_NSPlaceholderData", "_NSJSONReader", ] ) class LeaksRunnerResult(perf_test_driver.Result): def __init__(self, name, count=None): # True = 1, False = 0. # # TODO: Why are we using booleans to communicate status here? status = count is None or count > 0 perf_test_driver.Result.__init__(self, name, status, "", XFAIL_LIST) self.count = count def get_count(self): if self.count is not None: return str(self.count) return "N/A" def print_data(self, max_test_len): fmt = "{:<%d}{:<10}{:}" % (max_test_len + 5) print(fmt.format(self.get_name(), self.get_result(), self.get_count())) class LeaksRunnerBenchmarkDriver(perf_test_driver.BenchmarkDriver): def __init__(self, binary, xfail_list, num_samples, num_iters, verbose): perf_test_driver.BenchmarkDriver.__init__( self, binary, xfail_list, enable_parallel=True ) self.num_samples = num_samples self.num_iters = num_iters self.verbose = verbose def print_data_header(self, max_test_len): fmt = "{:<%d}{:<10}{:}" % (max_test_len + 5) print(fmt.format("Name", "Result", "RC Delta")) # Propagate any data from this class that is needed for individual # tests. The reason this is needed is to avoid issues with attempting to # access a value in a different process. def prepare_input(self, name): return {"num_samples": self.num_samples, "num_iters": self.num_iters} def run_test_inner(self, data, num_iters): p = subprocess.Popen( [ data["path"], "--num-samples={}".format(data["num_samples"]), "--num-iters={}".format(num_iters), data["test_name"], ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) error_out = p.communicate()[1].split("\n") result = p.returncode if result is None: raise RuntimeError("Expected one line of output") if result != 0: raise RuntimeError("Process segfaulted") return error_out def run_test(self, data, num_iters): try: args = [data, num_iters] result = perf_test_driver.run_with_timeout(self.run_test_inner, args) except Exception as e: sys.stderr.write( "Child Process Failed! (%s,%s). Error: %s\n" % (data["path"], data["test_name"], e) ) sys.stderr.flush() return None try: # We grab the second line since swift globals get lazily created in # the first iteration. d = json.loads(result[1]) d["objc_objects"] = [ x for x in d["objc_objects"] if x not in IGNORABLE_GLOBAL_OBJC_CLASSES ] d["objc_count"] = len(d["objc_objects"]) # If we are asked to emit verbose output, do so now. if self.verbose: tmp = (data["path"], data["test_name"], d) sys.stderr.write( "VERBOSE (%s,%s): %s" % tmp) sys.stderr.flush() total_count = d["objc_count"] + d["swift_count"] return total_count except Exception: tmp = (data["path"], data["test_name"]) sys.stderr.write("Failed parse output! (%s,%s)\n" % tmp) sys.stderr.flush() return None def process_input(self, data): test_name = "({},{})".format(data["opt"], data["test_name"]) print("Running {}...".format(test_name)) sys.stdout.flush() total_count1 = self.run_test(data, data["num_iters"]) if total_count1 is None: return LeaksRunnerResult(test_name) total_count2 = self.run_test(data, data["num_iters"] + 1) if total_count2 is None: return LeaksRunnerResult(test_name) return LeaksRunnerResult(test_name, total_count2 - total_count1) SWIFT_BIN_DIR = os.path.dirname(os.path.abspath(__file__)) def parse_args(): import argparse parser = argparse.ArgumentParser() parser.add_argument( "-filter", type=str, default=None, help="Filter out any test that does not match the given regex", ) parser.add_argument("-num-samples", type=int, default=2) parser.add_argument("-num-iters", type=int, default=2) parser.add_argument("-v", "--verbose", action="store_true", help="Upon failure, dump out raw result", dest="verbose") return parser.parse_args() if __name__ == "__main__": args = parse_args() driver = LeaksRunnerBenchmarkDriver( SWIFT_BIN_DIR, XFAIL_LIST, args.num_samples, args.num_iters, args.verbose ) if driver.run(args.filter): sys.exit(0) else: sys.exit(-1)