Make thread safe

This commit is contained in:
Yim Lee
2022-03-10 12:29:25 -08:00
parent a5adf2640b
commit 2adcd724cb
2 changed files with 99 additions and 11 deletions

View File

@@ -12,6 +12,12 @@
import Swift
#if canImport(Glibc)
import Glibc
#elseif os(Windows)
import WinSDK
#endif
public struct LocalTestingActorAddress: Hashable, Sendable, Codable {
public let address: String
@@ -30,7 +36,6 @@ public struct LocalTestingActorAddress: Hashable, Sendable, Codable {
}
}
// TODO(distributed): not thread safe...
@available(SwiftStdlib 5.7, *)
public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @unchecked Sendable {
public typealias ActorID = LocalTestingActorAddress
@@ -39,15 +44,17 @@ public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @
public typealias SerializationRequirement = Codable
private var activeActors: [ActorID: DistributedActor] = [:]
private let activeActorsLock = _Lock()
private var idProvider: ActorIDProvider = ActorIDProvider()
private var assignedIDs: Set<ActorID> = []
private let assignedIDsLock = _Lock()
public init() {}
public func resolve<Act>(id: ActorID, as actorType: Act.Type)
throws -> Act? where Act: DistributedActor {
guard let anyActor = self.activeActors[id] else {
guard let anyActor = self.activeActorsLock.withLock({ self.activeActors[id] }) else {
throw LocalTestingDistributedActorSystemError(message: "Unable to locate id '\(id)' locally")
}
guard let actor = anyActor as? Act else {
@@ -59,24 +66,30 @@ public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @
public func assignID<Act>(_ actorType: Act.Type) -> ActorID
where Act: DistributedActor {
let id = self.idProvider.next()
self.assignedIDs.insert(id)
self.assignedIDsLock.withLock {
self.assignedIDs.insert(id)
}
return id
}
public func actorReady<Act>(_ actor: Act)
where Act: DistributedActor,
Act.ID == ActorID {
guard self.assignedIDs.contains(actor.id) else {
guard self.assignedIDsLock.withLock({ self.assignedIDs.contains(actor.id) }) else {
fatalError("Attempted to mark an unknown actor '\(actor.id)' ready")
}
self.activeActors[actor.id] = actor
self.activeActorsLock.withLock {
self.activeActors[actor.id] = actor
}
}
public func resignID(_ id: ActorID) {
guard self.assignedIDs.contains(id) else {
guard self.assignedIDsLock.withLock({ self.assignedIDs.contains(id) }) else {
fatalError("Attempted to resign unknown id '\(id)'")
}
self.activeActors.removeValue(forKey: id)
self.activeActorsLock.withLock {
self.activeActors.removeValue(forKey: id)
}
}
public func makeInvocationEncoder() -> InvocationEncoder {
@@ -109,15 +122,18 @@ public final class LocalTestingDistributedActorSystem: DistributedActorSystem, @
fatalError("Attempted to make remote call on actor \(actor) in a local-only actor system")
}
// TODO(distributed): not thread safe...
private struct ActorIDProvider {
private var counter: Int = 0
private let counterLock = _Lock()
init() {}
mutating func next() -> LocalTestingActorAddress {
self.counter += 1
return LocalTestingActorAddress(parse: "\(self.counter)")
let id: Int = self.counterLock.withLock {
self.counter += 1
return self.counter
}
return LocalTestingActorAddress(parse: "\(id)")
}
}
}
@@ -176,3 +192,75 @@ public struct LocalTestingDistributedActorSystemError: DistributedActorSystemErr
self.message = message
}
}
// === lock ----------------------------------------------------------------
fileprivate class _Lock {
#if os(Windows)
private let underlying: UnsafeMutablePointer<SRWLOCK>
#elseif os(Cygwin) || os(FreeBSD) || os(OpenBSD)
private let underlying: UnsafeMutablePointer<pthread_mutex_t?>
#elseif os(WASI)
// pthread is currently not available on WASI
#else
private let underlying: UnsafeMutablePointer<pthread_mutex_t>
#endif
deinit {
#if os(Windows)
// Mutexes do not need to be explicitly destroyed
#elseif os(WASI)
// WASI environment has only a single thread
#else
guard pthread_mutex_destroy(self.underlying) == 0 else {
fatalError("pthread_mutex_destroy failed")
}
#endif
#if !os(WASI)
self.underlying.deinitialize(count: 1)
self.underlying.deallocate()
#endif
}
init() {
#if os(Windows)
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
InitializeSRWLock(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
self.underlying = UnsafeMutablePointer.allocate(capacity: 1)
guard pthread_mutex_init(self.underlying, nil) == 0 else {
fatalError("pthread_mutex_init failed")
}
#endif
}
@discardableResult
func withLock<T>(_ body: () -> T) -> T {
#if os(Windows)
AcquireSRWLockExclusive(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
guard pthread_mutex_lock(self.underlying) == 0 else {
fatalError("pthread_mutex_lock failed")
}
#endif
defer {
#if os(Windows)
ReleaseSRWLockExclusive(self.underlying)
#elseif os(WASI)
// WASI environment has only a single thread
#else
guard pthread_mutex_unlock(self.underlying) == 0 else {
fatalError("pthread_mutex_unlock failed")
}
#endif
}
return body()
}
}