Files
swift-mirror/stdlib/public/Distributed/LocalTestingDistributedActorSystem.swift
Konrad `ktoso` Malawski 8d3e7d91c0 [Distributed] ResultHandler.onReturn must be ad-hoc because SerializationRequirement (#41916)
* [Distributed] Invoke handler.onReturn ad hoc via ast synthesized func

* reformat and cleanup

* remove unused var
2022-03-24 21:44:37 +09:00

302 lines
9.7 KiB
Swift

//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
import Swift
#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#elseif os(Windows)
import WinSDK
#endif
/// A `DistributedActorSystem` designed for local only testing.
///
/// It will crash on any attempt of remote communication, but can be useful
/// for learning about `distributed actor` isolation, as well as early
/// prototyping stages of development where a real system is not necessary yet.
@available(SwiftStdlib 5.7, *)
public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @unchecked Sendable {
public typealias ActorID = LocalTestingActorAddress
public typealias ResultHandler = LocalTestingInvocationResultHandler
public typealias InvocationEncoder = LocalTestingInvocationEncoder
public typealias InvocationDecoder = LocalTestingInvocationDecoder
public typealias SerializationRequirement = Codable
private var activeActors: [ActorID: any DistributedActor] = [:]
private let activeActorsLock = _Lock()
private var idProvider: ActorIDProvider = ActorIDProvider()
private var assignedIDs: Set<ActorID> = []
private let assignedIDsLock = _Lock()
public init() {}
public func resolve<Act>(id: ActorID, as actorType: Act.Type)
throws -> Act? where Act: DistributedActor {
guard let anyActor = self.activeActorsLock.withLock({ self.activeActors[id] }) else {
throw LocalTestingDistributedActorSystemError(message: "Unable to locate id '\(id)' locally")
}
guard let actor = anyActor as? Act else {
throw LocalTestingDistributedActorSystemError(message: "Failed to resolve id '\(id)' as \(Act.Type.self)")
}
return actor
}
public func assignID<Act>(_ actorType: Act.Type) -> ActorID
where Act: DistributedActor {
let id = self.idProvider.next()
self.assignedIDsLock.withLock {
self.assignedIDs.insert(id)
}
return id
}
public func actorReady<Act>(_ actor: Act)
where Act: DistributedActor,
Act.ID == ActorID {
guard self.assignedIDsLock.withLock({ self.assignedIDs.contains(actor.id) }) else {
fatalError("Attempted to mark an unknown actor '\(actor.id)' ready")
}
self.activeActorsLock.withLock {
self.activeActors[actor.id] = actor
}
}
public func resignID(_ id: ActorID) {
self.activeActorsLock.withLock {
self.activeActors.removeValue(forKey: id)
}
}
public func makeInvocationEncoder() -> InvocationEncoder {
.init()
}
public func remoteCall<Act, Err, Res>(
on actor: Act,
target: RemoteCallTarget,
invocation: inout InvocationEncoder,
throwing errorType: Err.Type,
returning returnType: Res.Type
) async throws -> Res
where Act: DistributedActor,
Act.ID == ActorID,
Err: Error,
Res: SerializationRequirement {
fatalError("Attempted to make remote call to \(target) on actor \(actor) using a local-only actor system")
}
public func remoteCallVoid<Act, Err>(
on actor: Act,
target: RemoteCallTarget,
invocation: inout InvocationEncoder,
throwing errorType: Err.Type
) async throws
where Act: DistributedActor,
Act.ID == ActorID,
Err: Error {
fatalError("Attempted to make remote call to \(target) on actor \(actor) using a local-only actor system")
}
private struct ActorIDProvider {
private var counter: Int = 0
private let counterLock = _Lock()
init() {}
mutating func next() -> LocalTestingActorAddress {
let id: Int = self.counterLock.withLock {
self.counter += 1
return self.counter
}
return LocalTestingActorAddress(parse: "\(id)")
}
}
}
@available(SwiftStdlib 5.7, *)
public struct LocalTestingActorAddress: Hashable, Sendable, Codable {
public let address: String
public init(parse address: String) {
self.address = address
}
public init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
self.address = try container.decode(String.self)
}
public func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(self.address)
}
}
@available(SwiftStdlib 5.7, *)
public struct LocalTestingInvocationEncoder: DistributedTargetInvocationEncoder {
public typealias SerializationRequirement = Codable
public mutating func recordGenericSubstitution<T>(_ type: T.Type) throws {
fatalError("Attempted to call encoder method in a local-only actor system")
}
public mutating func recordArgument<Value: SerializationRequirement>(_ argument: RemoteCallArgument<Value>) throws {
fatalError("Attempted to call encoder method in a local-only actor system")
}
public mutating func recordErrorType<E: Error>(_ type: E.Type) throws {
fatalError("Attempted to call encoder method in a local-only actor system")
}
public mutating func recordReturnType<R: SerializationRequirement>(_ type: R.Type) throws {
fatalError("Attempted to call encoder method in a local-only actor system")
}
public mutating func doneRecording() throws {
fatalError("Attempted to call encoder method in a local-only actor system")
}
}
@available(SwiftStdlib 5.7, *)
public class LocalTestingInvocationDecoder : DistributedTargetInvocationDecoder {
public typealias SerializationRequirement = Codable
public func decodeGenericSubstitutions() throws -> [Any.Type] {
fatalError("Attempted to call decoder method in a local-only actor system")
}
public func decodeNextArgument<Argument: SerializationRequirement>() throws -> Argument {
fatalError("Attempted to call decoder method in a local-only actor system")
}
public func decodeErrorType() throws -> Any.Type? {
fatalError("Attempted to call decoder method in a local-only actor system")
}
public func decodeReturnType() throws -> Any.Type? {
fatalError("Attempted to call decoder method in a local-only actor system")
}
}
@available(SwiftStdlib 5.7, *)
public struct LocalTestingInvocationResultHandler: DistributedTargetInvocationResultHandler {
public typealias SerializationRequirement = Codable
public func onReturn<Success: SerializationRequirement>(value: Success) async throws {
fatalError("Attempted to call decoder method in a local-only actor system")
}
public func onReturnVoid() async throws {
fatalError("Attempted to call decoder method in a local-only actor system")
}
public func onThrow<Err: Error>(error: Err) async throws {
fatalError("Attempted to call decoder method in a local-only actor system")
}
}
// === errors ----------------------------------------------------------------
@available(SwiftStdlib 5.7, *)
public struct LocalTestingDistributedActorSystemError: DistributedActorSystemError {
public let message: String
public init(message: String) {
self.message = message
}
}
// === lock ----------------------------------------------------------------
@available(SwiftStdlib 5.7, *)
fileprivate class _Lock {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
private let underlying: UnsafeMutablePointer<os_unfair_lock>
#elseif os(Windows)
private let underlying: UnsafeMutablePointer<SRWLOCK>
#elseif os(WASI)
// pthread is currently not available on WASI
#elseif os(Cygwin) || os(FreeBSD) || os(OpenBSD)
private let underlying: UnsafeMutablePointer<pthread_mutex_t?>
#else
private let underlying: UnsafeMutablePointer<pthread_mutex_t>
#endif
deinit {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
// `os_unfair_lock`s do not need to be explicitly destroyed
#elseif os(Windows)
// `SRWLOCK`s do not need to be explicitly destroyed
#elseif os(WASI)
// WASI environment has only a single thread
#else
guard pthread_mutex_destroy(self.underlying) == 0 else {
fatalError("pthread_mutex_destroy failed")
}
#endif
#if !os(WASI)
self.underlying.deinitialize(count: 1)
self.underlying.deallocate()
#endif
}
init() {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
#elseif os(Windows)
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
InitializeSRWLock(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
guard pthread_mutex_init(self.underlying, nil) == 0 else {
fatalError("pthread_mutex_init failed")
}
#endif
}
@discardableResult
func withLock<T>(_ body: () -> T) -> T {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
os_unfair_lock_lock(self.underlying)
#elseif os(Windows)
AcquireSRWLockExclusive(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
guard pthread_mutex_lock(self.underlying) == 0 else {
fatalError("pthread_mutex_lock failed")
}
#endif
defer {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
os_unfair_lock_unlock(self.underlying)
#elseif os(Windows)
ReleaseSRWLockExclusive(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
guard pthread_mutex_unlock(self.underlying) == 0 else {
fatalError("pthread_mutex_unlock failed")
}
#endif
}
return body()
}
}