[Distributed] workaround for LocalTestingDAS crashes;

This happens to work, but is not a real fix; we are not handling well
types declared in a library evolution enabled library, which this DAS
is. Without this, at runtime, we crash in LocalTestingDAS using actor
initializers
This commit is contained in:
Konrad `ktoso` Malawski
2022-05-04 15:05:53 +09:00
parent 223d73cc47
commit bcd0f64e91
6 changed files with 738 additions and 689 deletions

View File

@@ -11,6 +11,7 @@
//===----------------------------------------------------------------------===//
import Swift
import _Concurrency
#if canImport(Darwin)
import Darwin
@@ -26,25 +27,33 @@ import WinSDK
/// for learning about `distributed actor` isolation, as well as early
/// prototyping stages of development where a real system is not necessary yet.
@available(SwiftStdlib 5.7, *)
public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @unchecked Sendable {
public struct LocalTestingDistributedActorSystem: DistributedActorSystem, @unchecked Sendable {
public typealias ActorID = LocalTestingActorAddress
public typealias ResultHandler = LocalTestingInvocationResultHandler
public typealias InvocationEncoder = LocalTestingInvocationEncoder
public typealias InvocationDecoder = LocalTestingInvocationDecoder
public typealias SerializationRequirement = Codable
public typealias SerializationRequirement = any Codable
private var activeActors: [ActorID: any DistributedActor] = [:]
private let activeActorsLock = _Lock()
private var idProvider: ActorIDProvider = ActorIDProvider()
private var assignedIDs: Set<ActorID> = []
private let assignedIDsLock = _Lock()
@usableFromInline
final class _Storage {
public init() {}
var activeActors: [ActorID: any DistributedActor] = [:]
let activeActorsLock = _Lock()
var assignedIDs: Set<ActorID> = []
let assignedIDsLock = _Lock()
}
let storage: _Storage
var idProvider: ActorIDProvider = ActorIDProvider()
public init() {
storage = .init()
}
public func resolve<Act>(id: ActorID, as actorType: Act.Type)
throws -> Act? where Act: DistributedActor {
guard let anyActor = self.activeActorsLock.withLock({ self.activeActors[id] }) else {
throws -> Act? where Act: DistributedActor, Act.ID == ActorID {
guard let anyActor = storage.activeActorsLock.withLock({ self.storage.activeActors[id] }) else {
throw LocalTestingDistributedActorSystemError(message: "Unable to locate id '\(id)' locally")
}
guard let actor = anyActor as? Act else {
@@ -54,28 +63,25 @@ public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @
}
public func assignID<Act>(_ actorType: Act.Type) -> ActorID
where Act: DistributedActor {
where Act: DistributedActor, Act.ID == ActorID {
let id = self.idProvider.next()
self.assignedIDsLock.withLock {
self.assignedIDs.insert(id)
storage.assignedIDsLock.withLock {
self.storage.assignedIDs.insert(id)
}
return id
}
public func actorReady<Act>(_ actor: Act)
where Act: DistributedActor,
Act.ID == ActorID {
guard self.assignedIDsLock.withLock({ self.assignedIDs.contains(actor.id) }) else {
where Act: DistributedActor, Act.ID == ActorID {
guard storage.assignedIDsLock.withLock({ self.storage.assignedIDs.contains(actor.id) }) else {
fatalError("Attempted to mark an unknown actor '\(actor.id)' ready")
}
self.activeActorsLock.withLock {
self.activeActors[actor.id] = actor
}
self.storage.activeActors[actor.id] = actor
}
public func resignID(_ id: ActorID) {
self.activeActorsLock.withLock {
self.activeActors.removeValue(forKey: id)
storage.activeActorsLock.withLock {
self.storage.activeActors.removeValue(forKey: id)
}
}
@@ -93,7 +99,7 @@ public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @
where Act: DistributedActor,
Act.ID == ActorID,
Err: Error,
Res: SerializationRequirement {
Res: Codable {
fatalError("Attempted to make remote call to \(target) on actor \(actor) using a local-only actor system")
}
@@ -109,38 +115,40 @@ public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @
fatalError("Attempted to make remote call to \(target) on actor \(actor) using a local-only actor system")
}
private struct ActorIDProvider {
@usableFromInline
final class ActorIDProvider {
private var counter: Int = 0
private let counterLock = _Lock()
@usableFromInline
init() {}
mutating func next() -> LocalTestingActorAddress {
func next() -> LocalTestingActorAddress {
let id: Int = self.counterLock.withLock {
self.counter += 1
return self.counter
}
return LocalTestingActorAddress(parse: "\(id)")
return LocalTestingActorAddress(id)
}
}
}
@available(SwiftStdlib 5.7, *)
public struct LocalTestingActorAddress: Hashable, Sendable, Codable {
public let address: String
public let uuid: String
public init(parse address: String) {
self.address = address
public init(_ uuid: Int) {
self.uuid = "\(uuid)"
}
public init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
self.address = try container.decode(String.self)
self.uuid = try container.decode(String.self)
}
public func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(self.address)
try container.encode(self.uuid)
}
}
@@ -220,7 +228,7 @@ public struct LocalTestingDistributedActorSystemError: DistributedActorSystemErr
// === lock ----------------------------------------------------------------
@available(SwiftStdlib 5.7, *)
fileprivate class _Lock {
internal class _Lock {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
private let underlying: UnsafeMutablePointer<os_unfair_lock>
#elseif os(Windows)
@@ -233,10 +241,27 @@ fileprivate class _Lock {
private let underlying: UnsafeMutablePointer<pthread_mutex_t>
#endif
init() {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
self.underlying.initialize(to: os_unfair_lock())
#elseif os(Windows)
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
InitializeSRWLock(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
guard pthread_mutex_init(self.underlying, nil) == 0 else {
fatalError("pthread_mutex_init failed")
}
#endif
}
deinit {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
// `os_unfair_lock`s do not need to be explicitly destroyed
#elseif os(Windows)
#elseif os(Windows)
// `SRWLOCK`s do not need to be explicitly destroyed
#elseif os(WASI)
// WASI environment has only a single thread
@@ -252,22 +277,6 @@ fileprivate class _Lock {
#endif
}
init() {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
#elseif os(Windows)
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
InitializeSRWLock(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
guard pthread_mutex_init(self.underlying, nil) == 0 else {
fatalError("pthread_mutex_init failed")
}
#endif
}
@discardableResult
func withLock<T>(_ body: () -> T) -> T {
#if os(iOS) || os(macOS) || os(tvOS) || os(watchOS)