[Distributed] Handle mangling thunks in extensions with generic AS and $Stubs (#71914)

This commit is contained in:
Konrad `ktoso` Malawski
2024-02-29 21:22:00 +09:00
committed by GitHub
parent 92f3b0f2c4
commit c56a1e8be7
29 changed files with 565 additions and 296 deletions

View File

@@ -223,6 +223,7 @@ public final class FakeRoundtripActorSystem: DistributedActorSystem, @unchecked
public typealias ResultHandler = FakeRoundtripResultHandler
var activeActors: [ActorID: any DistributedActor] = [:]
var forcedNextRemoteCallReply: Any? = nil
public init() {}
@@ -261,6 +262,10 @@ public final class FakeRoundtripActorSystem: DistributedActorSystem, @unchecked
private var remoteCallResult: Any? = nil
private var remoteCallError: Error? = nil
public func forceNextRemoteCallReply(_ reply: Any) {
self.forcedNextRemoteCallReply = reply
}
public func remoteCall<Act, Err, Res>(
on actor: Act,
target: RemoteCallTarget,
@@ -273,10 +278,16 @@ public final class FakeRoundtripActorSystem: DistributedActorSystem, @unchecked
Err: Error,
Res: SerializationRequirement {
print(" >> remoteCall: on:\(actor), target:\(target), invocation:\(invocation), throwing:\(String(reflecting: errorType)), returning:\(String(reflecting: returnType))")
print(" > execute distributed target: \(target), identifier: \(target.identifier)")
guard let targetActor = activeActors[actor.id] else {
fatalError("Attempted to call mock 'roundtrip' on: \(actor.id) without active actor: \(target.identifier)")
}
if let forcedNextRemoteCallReply {
defer { self.forcedNextRemoteCallReply = nil }
return forcedNextRemoteCallReply as! Res
}
func doIt<A: DistributedActor>(active: A) async throws -> Res {
guard (actor.id) == active.id as! ActorID else {
fatalError("Attempted to call mock 'roundtrip' on unknown actor: \(actor.id), known: \(active.id)")
@@ -292,7 +303,6 @@ public final class FakeRoundtripActorSystem: DistributedActorSystem, @unchecked
var decoder = invocation.makeDecoder()
print(" > execute distributed target: \(target), identifier: \(target.identifier)")
try await executeDistributedTarget(
on: active,
target: target,
@@ -503,6 +513,329 @@ public struct FakeRoundtripResultHandler: DistributedTargetInvocationResultHandl
}
}
// ==== CustomSerializationProtocol Transport ----------------------------------
public protocol CustomSerializationProtocol {
func toBytes() throws -> [UInt8]
static func fromBytes(_ bytes: [UInt8]) throws -> Self
}
extension ActorAddress {
func toBytes() throws -> [UInt8] {
var bytes: [UInt8] = []
bytes.reserveCapacity(address.count)
address.utf8CString.withUnsafeBytes { bs in
for b in bs {
bytes.append(b)
}
}
return bytes
}
func fromBytes(_ bytes: [UInt8]) throws -> ActorAddress {
let address = String(cString: bytes)
return Self.init(parse: address)
}
}
@available(SwiftStdlib 5.7, *)
public final class FakeCustomSerializationRoundtripActorSystem: DistributedActorSystem, @unchecked Sendable {
public typealias ActorID = ActorAddress
public typealias InvocationEncoder = FakeCustomSerializationInvocationEncoder
public typealias InvocationDecoder = FakeCustomSerializationInvocationDecoder
public typealias SerializationRequirement = CustomSerializationProtocol
public typealias ResultHandler = FakeCustomSerializationRoundtripResultHandler
var activeActors: [ActorID: any DistributedActor] = [:]
var forcedNextRemoteCallReply: Any? = nil
public init() {}
public func shutdown() {
self.activeActors = [:]
}
public func resolve<Act>(id: ActorID, as actorType: Act.Type)
throws -> Act? where Act: DistributedActor {
print("| resolve \(id) as remote // this system always resolves as remote")
return nil
}
public func assignID<Act>(_ actorType: Act.Type) -> ActorID
where Act: DistributedActor {
let id = ActorAddress(parse: "<unique-id>")
print("| assign id: \(id) for \(actorType)")
return id
}
public func actorReady<Act>(_ actor: Act)
where Act: DistributedActor,
Act.ID == ActorID {
print("| actor ready: \(actor)")
self.activeActors[actor.id] = actor
}
public func resignID(_ id: ActorID) {
print("X resign id: \(id)")
}
public func makeInvocationEncoder() -> InvocationEncoder {
.init()
}
private var remoteCallResult: Any? = nil
private var remoteCallError: Error? = nil
public func forceNextRemoteCallReply(_ reply: Any) {
self.forcedNextRemoteCallReply = reply
}
public func remoteCall<Act, Err, Res>(
on actor: Act,
target: RemoteCallTarget,
invocation: inout InvocationEncoder,
throwing errorType: Err.Type,
returning returnType: Res.Type
) async throws -> Res
where Act: DistributedActor,
Act.ID == ActorID,
Err: Error,
Res: SerializationRequirement {
print(" >> remoteCall: on:\(actor), target:\(target), invocation:\(invocation), throwing:\(String(reflecting: errorType)), returning:\(String(reflecting: returnType))")
print(" > execute distributed target: \(target), identifier: \(target.identifier)")
guard let targetActor = activeActors[actor.id] else {
fatalError("Attempted to call mock 'roundtrip' on: \(actor.id) without active actor: \(target.identifier)")
}
if let forcedNextRemoteCallReply {
defer { self.forcedNextRemoteCallReply = nil }
return forcedNextRemoteCallReply as! Res
}
func doIt<A: DistributedActor>(active: A) async throws -> Res {
guard (actor.id) == active.id as! ActorID else {
fatalError("Attempted to call mock 'roundtrip' on unknown actor: \(actor.id), known: \(active.id)")
}
let resultHandler = FakeCustomSerializationRoundtripResultHandler { value in
self.remoteCallResult = value
self.remoteCallError = nil
} onError: { error in
self.remoteCallResult = nil
self.remoteCallError = error
}
var decoder = invocation.makeDecoder()
try await executeDistributedTarget(
on: active,
target: target,
invocationDecoder: &decoder,
handler: resultHandler
)
switch (remoteCallResult, remoteCallError) {
case (.some(let value), nil):
print(" << remoteCall return: \(value)")
return remoteCallResult! as! Res
case (nil, .some(let error)):
print(" << remoteCall throw: \(error)")
throw error
default:
fatalError("No reply!")
}
}
return try await _openExistential(targetActor, do: doIt)
}
public func remoteCallVoid<Act, Err>(
on actor: Act,
target: RemoteCallTarget,
invocation: inout InvocationEncoder,
throwing errorType: Err.Type
) async throws
where Act: DistributedActor,
Act.ID == ActorID,
Err: Error {
print(" >> remoteCallVoid: on:\(actor), target:\(target), invocation:\(invocation), throwing:\(String(reflecting: errorType))")
guard let targetActor = activeActors[actor.id] else {
fatalError("Attempted to call mock 'roundtrip' on: \(actor.id) without active actor")
}
func doIt<A: DistributedActor>(active: A) async throws {
guard (actor.id) == active.id as! ActorID else {
fatalError("Attempted to call mock 'roundtrip' on unknown actor: \(actor.id), known: \(active.id)")
}
let resultHandler = FakeCustomSerializationRoundtripResultHandler { value in
self.remoteCallResult = value
self.remoteCallError = nil
} onError: { error in
self.remoteCallResult = nil
self.remoteCallError = error
}
var decoder = invocation.makeDecoder()
print(" > execute distributed target: \(target)")
try await executeDistributedTarget(
on: active,
target: target,
invocationDecoder: &decoder,
handler: resultHandler
)
switch (remoteCallResult, remoteCallError) {
case (.some, nil):
return
case (nil, .some(let error)):
print(" << remoteCall throw: \(error)")
throw error
default:
fatalError("No reply!")
}
}
try await _openExistential(targetActor, do: doIt)
}
}
@available(SwiftStdlib 5.7, *)
public struct FakeCustomSerializationInvocationEncoder : DistributedTargetInvocationEncoder {
public typealias SerializationRequirement = CustomSerializationProtocol
var genericSubs: [Any.Type] = []
var arguments: [Any] = []
var returnType: Any.Type? = nil
var errorType: Any.Type? = nil
public mutating func recordGenericSubstitution<T>(_ type: T.Type) throws {
print(" > encode generic sub: \(String(reflecting: type))")
genericSubs.append(type)
}
public mutating func recordArgument<Value: SerializationRequirement>(
_ argument: RemoteCallArgument<Value>) throws {
print(" > encode argument name:\(argument.label ?? "_"), value: \(argument.value)")
arguments.append(argument.value)
}
public mutating func recordErrorType<E: Error>(_ type: E.Type) throws {
print(" > encode error type: \(String(reflecting: type))")
self.errorType = type
}
public mutating func recordReturnType<R: SerializationRequirement>(_ type: R.Type) throws {
print(" > encode return type: \(String(reflecting: type))")
self.returnType = type
}
public mutating func doneRecording() throws {
print(" > done recording")
}
public mutating func makeDecoder() -> FakeCustomSerializationInvocationDecoder {
defer {
// reset the decoder; we don't want to keep these values retained by accident here
genericSubs = []
arguments = []
returnType = nil
errorType = nil
}
return .init(
args: arguments,
substitutions: genericSubs,
returnType: returnType,
errorType: errorType
)
}
}
// === decoding --------------------------------------------------------------
// !!! WARNING !!!
// This is a 'final class' on purpose, to see that we retain the ad-hoc witness
// for 'decodeNextArgument'; Do not change it to just a class!
@available(SwiftStdlib 5.7, *)
public final class FakeCustomSerializationInvocationDecoder: DistributedTargetInvocationDecoder {
public typealias SerializationRequirement = CustomSerializationProtocol
var genericSubs: [Any.Type] = []
var arguments: [Any] = []
var returnType: Any.Type? = nil
var errorType: Any.Type? = nil
var argumentIndex: Int = 0
fileprivate init(
args: [Any],
substitutions: [Any.Type] = [],
returnType: Any.Type? = nil,
errorType: Any.Type? = nil
) {
self.arguments = args
self.genericSubs = substitutions
self.returnType = returnType
self.errorType = errorType
}
public func decodeGenericSubstitutions() throws -> [Any.Type] {
print(" > decode generic subs: \(genericSubs)")
return genericSubs
}
public func decodeNextArgument<Argument: SerializationRequirement>() throws -> Argument {
guard argumentIndex < arguments.count else {
fatalError("Attempted to decode more arguments than stored! Index: \(argumentIndex), args: \(arguments)")
}
let anyArgument = arguments[argumentIndex]
guard let argument = anyArgument as? Argument else {
fatalError("Cannot cast argument\(anyArgument) to expected \(Argument.self)")
}
print(" > decode argument: \(argument)")
argumentIndex += 1
return argument
}
public func decodeErrorType() throws -> Any.Type? {
print(" > decode return type: \(errorType.map { String(reflecting: $0) } ?? "nil")")
return self.errorType
}
public func decodeReturnType() throws -> Any.Type? {
print(" > decode return type: \(returnType.map { String(reflecting: $0) } ?? "nil")")
return self.returnType
}
}
@available(SwiftStdlib 5.7, *)
public struct FakeCustomSerializationRoundtripResultHandler: DistributedTargetInvocationResultHandler {
public typealias SerializationRequirement = CustomSerializationProtocol
let storeReturn: (any Any) -> Void
let storeError: (any Error) -> Void
init(_ storeReturn: @escaping (Any) -> Void, onError storeError: @escaping (Error) -> Void) {
self.storeReturn = storeReturn
self.storeError = storeError
}
public func onReturn<Success: SerializationRequirement>(value: Success) async throws {
print(" << onReturn: \(value)")
storeReturn(value)
}
public func onReturnVoid() async throws {
print(" << onReturnVoid: ()")
storeReturn(())
}
public func onThrow<Err: Error>(error: Err) async throws {
print(" << onThrow: \(error)")
storeError(error)
}
}
// ==== Helpers ----------------------------------------------------------------
@available(SwiftStdlib 5.7, *)

View File

@@ -45,7 +45,7 @@ protocol G3<ActorSystem>: DistributedActor, EmptyBase where ActorSystem: Distrib
// CHECK: }
// CHECK: extension G3 where Self: Distributed._DistributedActorStub {
// CHECK: func get() -> String {
// CHECK: distributed func get() -> String {
// CHECK: if #available (SwiftStdlib 6.0, *) {
// CHECK: Distributed._distributedStubFatalError()
// CHECK: } else {

View File

@@ -57,7 +57,7 @@ func test_usedToBeAsync_but_remoteImplIsNotAnymore() async throws {
// so mangling based on the 'async throws' thunk does not introduce
// unexpected effects in remote calls.
//
// Design limitation by choice: this means we cannot
// Design limitation by choice: this means we cannot overload distributed methods on async-ness alone
target: RemoteCallTarget("$s4main7GreeterC28usedToHaveAsyncBytNotAnymoreSSyYaKFTE"),
invocation: &invocation,
throwing: Never.self,

View File

@@ -21,6 +21,7 @@ import FakeDistributedActorSystems
typealias DefaultDistributedActorSystem = FakeRoundtripActorSystem
protocol Greeting: DistributedActor {
// protocol Greeting: DistributedActor where ActorSystem: DistributedActorSystem<any Codable> {
distributed func greeting() -> String
distributed func greetingAsyncThrows() async throws -> String
}
@@ -35,7 +36,7 @@ extension Greeting {
}
}
extension Greeting where SerializationRequirement == Codable {
extension Greeting where ActorSystem: DistributedActorSystem<any Codable> {
// okay, uses Codable to transfer arguments.
distributed func greetDistributed(name: String) async throws {
// okay, we're on the actor
@@ -53,10 +54,11 @@ extension Greeting where SerializationRequirement == Codable {
}
}
extension Greeting where SerializationRequirement == Codable {
extension Greeting where ActorSystem: DistributedActorSystem<any Codable> {
nonisolated func greetAliceALot() async throws {
try await greetDistributed(name: "Alice") // okay, via Codable
let rawGreeting = try await greeting() // okay, via Self's serialization requirement
print("rawGreeting = \(rawGreeting)")
// greetLocal(name: "Alice") // would be error: only 'distributed' instance methods can be called on a potentially remote distributed actor}}
}
}

View File

@@ -15,81 +15,43 @@
import Distributed
import FakeDistributedActorSystems
// FIXME(distributed): enable via @_DistributedProtocol
protocol GreeterProtocol: DistributedActor {
@_DistributedProtocol
protocol GreeterProtocol: DistributedActor where ActorSystem: DistributedActorSystem<any Codable> {
distributed func greet() -> String
}
// TODO: remove manual stubs code
extension GreeterProtocol where Self == GreeterProtocol_FakeRoundtripActorSystem_Stub {
static func resolve(
id: ID, using system: ActorSystem
) throws -> any GreeterProtocol {
print("\(Self.self).\(#function) -> return \(GreeterProtocol_FakeRoundtripActorSystem_Stub.self)")
return try GreeterProtocol_FakeRoundtripActorSystem_Stub(actorSystem: system)
}
}
// TODO: remove manual stubs code
distributed actor GreeterProtocol_FakeRoundtripActorSystem_Stub: GreeterProtocol {
typealias ActorSystem = FakeRoundtripActorSystem
distributed func greet() -> String {
let message = "STUB:\(Self.self).\(#function)"
print(message)
return message
}
}
// TODO: remove manual stubs code
extension GreeterProtocol where Self == GreeterProtocol_LocalTestingDistributedActorSystem_Stub {
static func resolve(
id: Self.ID, using system: Self.ActorSystem
) throws -> any GreeterProtocol {
print("\(Self.self).\(#function) -> return \(GreeterProtocol_LocalTestingDistributedActorSystem_Stub.self)")
return try GreeterProtocol_LocalTestingDistributedActorSystem_Stub(actorSystem: system)
}
}
// TODO: remove manual stubs code
distributed actor GreeterProtocol_LocalTestingDistributedActorSystem_Stub: GreeterProtocol {
typealias ActorSystem = LocalTestingDistributedActorSystem
distributed func greet() -> String {
let message = "STUB:\(Self.self).\(#function)"
print(message)
return message
}
}
// ==== ------------------------------------------------------------------------
distributed actor DAF {
distributed actor DAFR: GreeterProtocol {
typealias ActorSystem = FakeRoundtripActorSystem
distributed func greet() -> String { "\(Self.self)" }
}
distributed actor DAL {
distributed actor DAFL: GreeterProtocol {
typealias ActorSystem = LocalTestingDistributedActorSystem
distributed func greet() -> String { "\(Self.self)" }
}
@main struct Main {
static func main() async throws {
let fakeRoundtripSystem = FakeRoundtripActorSystem()
let fid = fakeRoundtripSystem.assignID(DAF.self)
let fr = DAFR(actorSystem: fakeRoundtripSystem)
let frid = fr.id
let localTestingSystem = LocalTestingDistributedActorSystem()
let gid = localTestingSystem.assignID(DAL.self)
let gfr: any GreeterProtocol = try $GreeterProtocol.resolve(id: frid, using: fakeRoundtripSystem)
let gf: any GreeterProtocol = try .resolve(id: fid, using: fakeRoundtripSystem)
print("resolved on \(fakeRoundtripSystem): \(type(of: gf))")
// CHECK: resolved on main.FakeRoundtripActorSystem: GreeterProtocol_FakeRoundtripActorSystem_Stub
print()
print("resolved on \(fakeRoundtripSystem): \(type(of: gfr))")
// CHECK: resolved on main.FakeRoundtripActorSystem: $GreeterProtocol<FakeRoundtripActorSystem>
let gl: any GreeterProtocol = try .resolve(id: gid, using: localTestingSystem)
print("resolved on \(localTestingSystem): \(type(of: gl))")
// CHECK: resolved on Distributed.LocalTestingDistributedActorSystem: GreeterProtocol_LocalTestingDistributedActorSystem_Stub
// CHECK: > execute distributed target: main.$GreeterProtocol.greet(), identifier: $s4main16$GreeterProtocolC5greetSSyYaKFTE
// Notes:
// - The call is made on the stub: $GreeterProtocol
// - the record is name is 'HF' for the accessible function
// FIXME: remove this when we can properly roundtrip through new accessor
fakeRoundtripSystem.forceNextRemoteCallReply("FAKE")
let got = try await gfr.greet()
print("got: \(got)")
print("ok") // CHECK: ok
}

View File

@@ -36,10 +36,10 @@ final class Obj : Codable, Sendable {
}
struct LargeStruct : Codable {
var a: Int
var b: Int
var c: String
var d: Double
var a: Int
var b: Int
var c: String
var d: Double
}
@available(SwiftStdlib 5.7, *)
@@ -145,4 +145,4 @@ public distributed actor MyOtherActor {
// CHECK-SAME: @"$s27distributed_actor_accessors7MyActorC19with_indirect_enumsyAA9IndirectEOAF_SitYaKFTEHF"
// CHECK-SAME: @"$s27distributed_actor_accessors7MyActorC7complexyAA11LargeStructVSaySiG_AA3ObjCSSSgAFtYaKFTEHF"
// CHECK-SAME: @"$s27distributed_actor_accessors12MyOtherActorC5emptyyyYaKFTEHF"
// CHECK-SAME: ], section "llvm.metadata"
// CHECK-SAME: ], section "llvm.metadata"