mirror of
https://github.com/apple/swift.git
synced 2025-12-14 20:36:38 +01:00
Support work-or-wait patterns a little better in AtomicWaitQueue.
Add a tryReloadAndWait method to Worker (which can only be used when not the worker thread). Revise the docs to describe this sort of pattern as the more standard pattern, which I think is likely to better reflect common use.
This commit is contained in:
@@ -120,6 +120,14 @@ public:
|
|||||||
/// An RAII helper class for signalling that the current thread is a
|
/// An RAII helper class for signalling that the current thread is a
|
||||||
/// worker thread which has acquired the lock.
|
/// worker thread which has acquired the lock.
|
||||||
///
|
///
|
||||||
|
/// `AtomicWaitQueue` does not require the global lock to be held
|
||||||
|
/// while creating or publishing the queue. Clients taking advantage
|
||||||
|
/// of this should inform the Worker class that a created queue has
|
||||||
|
/// been published by calling `flagQueueIsPublished`. Clients who
|
||||||
|
/// wish to publish the queue while holding the global lock, perhaps
|
||||||
|
/// to get a rule that all stores are done under the lock, may instead
|
||||||
|
/// use `tryPublishQueue`.
|
||||||
|
///
|
||||||
/// The expected use pattern is something like:
|
/// The expected use pattern is something like:
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
@@ -128,26 +136,47 @@ public:
|
|||||||
/// while (true) {
|
/// while (true) {
|
||||||
/// if (oldStatus.isDone()) return;
|
/// if (oldStatus.isDone()) return;
|
||||||
///
|
///
|
||||||
/// // Try to publish the wait queue. If this succeeds, we've become
|
/// if (oldStatus.hasWaitQueue()) {
|
||||||
/// // the worker thread.
|
/// bool waited = worker.tryReloadAndWait([&] {
|
||||||
/// bool published = worker.tryPublishQueue([&] {
|
/// oldStatus = myAtomic.load(std::memory_order_acquire);
|
||||||
/// auto newStatus = oldStatus.withLock(worker.createQueue());
|
/// return (oldStatus.hasWaitQueue() ? oldStatus.getWaitQueue()
|
||||||
/// return myAtomic.compare_exchange_weak(oldStatus, newStatus,
|
/// : nullptr);
|
||||||
|
/// });
|
||||||
|
///
|
||||||
|
/// // If we waited, `oldStatus` will be out of date; reload it.
|
||||||
|
/// //
|
||||||
|
/// // (For the pattern in this example, where the worker thread
|
||||||
|
/// // always transitions the status to done, this is actually
|
||||||
|
/// // unnecessary: by virtue of having successfully waited, we've
|
||||||
|
/// // synchronized with the worker thread and know that the status
|
||||||
|
/// // is done, so we could just return. But in general, this
|
||||||
|
/// // reload is necessary.)
|
||||||
|
/// if (waited)
|
||||||
|
/// oldStatus = myAtomic.load(std::memory_order_acquire);
|
||||||
|
///
|
||||||
|
/// // Go back and reconsider the updated status.
|
||||||
|
/// continue;
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// // Create a queue and try to publish it. If this succeeds,
|
||||||
|
/// // we've become the worker thread. We don't have to worry
|
||||||
|
/// // about the queue leaking if we don't use it; that's managed
|
||||||
|
/// // by the Worker class.
|
||||||
|
/// {
|
||||||
|
/// auto queue = worker.createQueue();
|
||||||
|
/// auto newStatus = oldStatus.withWaitQueue(queue);
|
||||||
|
/// if (!myAtomic.compare_exchange_weak(oldStatus, newStatus,
|
||||||
/// /*success*/ std::memory_order_release,
|
/// /*success*/ std::memory_order_release,
|
||||||
/// /*failure*/ std::memory_order_acquire);
|
/// /*failure*/ std::memory_order_acquire))
|
||||||
/// });
|
/// continue;
|
||||||
/// if (!published) continue;
|
/// worker.flagQueueIsPublished(queue);
|
||||||
|
/// }
|
||||||
///
|
///
|
||||||
/// // Do the actual work here.
|
/// // Do the actual work here.
|
||||||
///
|
///
|
||||||
/// // "Unpublish" the queue from the the atomic.
|
/// // Report that the work is done and "unpublish" the queue from
|
||||||
/// while (true) {
|
/// // the atomic.
|
||||||
/// auto newStatus = oldStatus.withDone(true);
|
/// myAtomic.store(oldStatus.withDone(true), std::memory_order_release);
|
||||||
/// if (myAtomic.compare_exchange_weak(oldStatus, newStatus,
|
|
||||||
/// /*success*/ std::memory_order_release,
|
|
||||||
/// /*failure*/ std::memory_order_acquire))
|
|
||||||
/// break;
|
|
||||||
/// }
|
|
||||||
/// worker.finishAndUnpublishQueue([]{});
|
/// worker.finishAndUnpublishQueue([]{});
|
||||||
/// return;
|
/// return;
|
||||||
/// }
|
/// }
|
||||||
@@ -193,6 +222,19 @@ public:
|
|||||||
return Published;
|
return Published;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Given that this thread is not the worker thread and there seems
|
||||||
|
/// to be a wait queue in place, try to wait on it.
|
||||||
|
///
|
||||||
|
/// Acquire the global lock and call the given function. If it
|
||||||
|
/// returns a wait queue, wait on that queue and return true;
|
||||||
|
/// otherwise, return false.
|
||||||
|
template <class Fn>
|
||||||
|
bool tryReloadAndWait(Fn &&fn) {
|
||||||
|
assert(!isWorkerThread());
|
||||||
|
typename Impl::Waiter waiter(GlobalLock);
|
||||||
|
return waiter.tryReloadAndWait(std::forward<Fn>(fn));
|
||||||
|
}
|
||||||
|
|
||||||
/// Given that this thread is the worker thread, return the queue
|
/// Given that this thread is the worker thread, return the queue
|
||||||
/// that's been created and published for it.
|
/// that's been created and published for it.
|
||||||
Impl *getPublishedQueue() const {
|
Impl *getPublishedQueue() const {
|
||||||
|
|||||||
Reference in New Issue
Block a user