Reapply "Fix quadratic performance of the ListMerger in specific usage pattern"

This reverts commit 2640ff613b.
This commit is contained in:
Mykola Pokhylets
2024-05-30 13:07:13 +02:00
parent d812e11fcc
commit 6298d41edf
16 changed files with 863 additions and 983 deletions

View File

@@ -40,6 +40,7 @@ set(SWIFT_BENCH_MODULES
single-source/ArrayRemoveAll
single-source/ArraySetElement
single-source/ArraySubscript
single-source/AsyncTree
single-source/BinaryFloatingPointConversionFromBinaryInteger
single-source/BinaryFloatingPointProperties
single-source/BitCount

View File

@@ -0,0 +1,77 @@
//===--- AsyncTree.swift -------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2021 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
import TestsUtils
import Dispatch
public var benchmarks: [BenchmarkInfo] {
guard #available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) else {
return []
}
return [
BenchmarkInfo(
name: "AsyncTree.100",
runFunction: run_AsyncTree(treeSize: 100),
tags: [.concurrency]
),
BenchmarkInfo(
name: "AsyncTree.5000",
runFunction: run_AsyncTree(treeSize: 5000),
tags: [.concurrency]
)
]
}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
private actor MyActor {
let g: DispatchGroup
init(_ g: DispatchGroup) {
self.g = g
}
func test(_ n: Int) {
let L = n / 2
let R = n - 1 - L
if L > 0 {
Task {
self.test(L)
}
}
if R > 0 {
Task {
self.test(R)
}
}
g.leave()
}
}
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
private func run_AsyncTree(treeSize: Int) -> (Int) -> Void {
return { n in
for _ in 0..<n {
let g = DispatchGroup()
for _ in 0..<treeSize {
g.enter()
}
let actor = MyActor(g)
Task {
await actor.test(treeSize)
}
g.wait()
}
}
}

View File

@@ -28,6 +28,7 @@ import ArrayOfRef
import ArrayRemoveAll
import ArraySetElement
import ArraySubscript
import AsyncTree
import BinaryFloatingPointConversionFromBinaryInteger
import BinaryFloatingPointProperties
import BitCount
@@ -223,6 +224,7 @@ register(ArrayOfRef.benchmarks)
register(ArrayRemoveAll.benchmarks)
register(ArraySetElement.benchmarks)
register(ArraySubscript.benchmarks)
register(AsyncTree.benchmarks)
register(BinaryFloatingPointConversionFromBinaryInteger.benchmarks)
register(BinaryFloatingPointProperties.benchmarks)
register(BitCount.benchmarks)

View File

@@ -2511,6 +2511,32 @@ inline int descendingPriorityOrder(JobPriority lhs,
return (lhs == rhs ? 0 : lhs > rhs ? -1 : 1);
}
enum { PriorityBucketCount = 5 };
inline int getPriorityBucketIndex(JobPriority priority) {
// Any unknown priorities will be rounded up to a known one.
// Priorities higher than UserInteractive are clamped to UserInteractive.
// Jobs of unknown priorities will end up in the same bucket as jobs of a
// corresponding known priority. Within the bucket they will be sorted in
// FIFO order.
if (priority > JobPriority::UserInitiated) {
// UserInteractive and higher
return 0;
} else if (priority > JobPriority::Default) {
// UserInitiated
return 1;
} else if (priority > JobPriority::Utility) {
// Default
return 2;
} else if (priority > JobPriority::Background) {
// Utility
return 3;
} else {
// Background and lower
return 4;
}
}
inline JobPriority withUserInteractivePriorityDowngrade(JobPriority priority) {
return (priority == JobPriority::UserInteractive) ? JobPriority::UserInitiated
: priority;

View File

@@ -0,0 +1,39 @@
//===--- HeaderFooterLayout.h -----------------------------------*- C++ -*-===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
#ifndef SWIFT_BASIC_HEADER_FOOTER_LAYOUT_H
#define SWIFT_BASIC_HEADER_FOOTER_LAYOUT_H
namespace swift {
template <class Header, class Footer, size_t TotalSize>
struct HeaderFooterLayoutPadding {
private:
enum : ptrdiff_t {
maxFooterOffset = TotalSize - (ptrdiff_t)sizeof(Footer),
footerAlignment = (ptrdiff_t)alignof(Footer),
footerOffset = maxFooterOffset - (maxFooterOffset % footerAlignment),
size = footerOffset - (ptrdiff_t)sizeof(Header)
};
char padding[size];
};
template <class Header, class Footer, size_t TotalSize>
struct HeaderFooterLayout
: Header,
HeaderFooterLayoutPadding<Header, Footer, TotalSize>,
Footer {};
} // namespace swift
#endif // SWIFT_BASIC_HEADER_FOOTER_LAYOUT_H

View File

@@ -1,383 +0,0 @@
//===--- ListMerger.h - Merging sorted linked lists -------------*- C++ -*-===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
//
// This file defines a class that helps with maintaining and merging a
// sorted linked list.
//
//===----------------------------------------------------------------------===//
#ifndef SWIFT_BASIC_LISTMERGER_H
#define SWIFT_BASIC_LISTMERGER_H
#include <assert.h>
namespace swift {
/// A class for building and merging sorted linked lists.
///
/// The `Node` type parameter represents a reference to a list node.
/// Conceptually, a `Node` value is either null or a reference to an
/// object with an abstract sort value and a `next` reference
/// (another `Node` value).
///
/// A null reference can be created by explicitly default-constructing
/// the `Node` type, e.g. with `Node()`. Converting a `Node` value
/// contextually to `bool` tests whether the node is a null reference.
/// `Node` values can be compared with the `==` and `!=` operators,
/// and equality with `Node()` is equivalent to a `bool` conversion.
/// These conditions are designed to allow pointer types to be used
/// directly, but they also permit other types. `ListMerger` is not
/// currently written to support smart pointer types efficiently,
/// however.
///
/// The sort value and `next` reference are not accessed directly;
/// instead, they are accessed with `static` functions on the
/// `NodeTraits` type parameter:
///
/// ```
/// /// Return the current value of the next reference.
/// static Node getNext(Node n);
///
/// /// Set the current value of the next reference.
/// static void setNext(Node n, Node next);
///
/// /// Compare the sort value of this node with that of another
/// /// node, returning negative (<), zero (==), or positive (>).
/// /// A node must compare equal to itself. A sorted list obeys
/// /// the condition that each node in the list compares <= the next.
/// static int compare(Node lhs, Node rhs);
/// ```
///
/// The merger holds a current list of nodes. The sort value and
/// next references of nodes must not be accessed after being added
/// to the merger and before being released except by the merger.
template <class Node, class NodeTraits>
class ListMerger {
Node root;
Node lastInsertionPoint = Node();
bool lastInsertionPointIsKnownLastOfEquals = false;
public:
/// Construct a merger with the given sorted list as its current list.
ListMerger(Node initialList = Node())
: root(initialList) {}
/// Add a single node to this merger's current list.
///
/// The next reference of the node will be overwritten and does not
/// need to be meaningful.
///
/// The relative order of nodes in the current list will not change,
/// and if there are nodes in the current list which compare equal
/// to the new node, it will be inserted after them.
void insert(Node newNode) {
assert(newNode && "inserting a null node");
Node prev = Node();
Node cur = root;
Node stopper = Node();
// If we have a previous insertion point, compare against it.
if (Node lastIP = lastInsertionPoint) {
int comparison = NodeTraits::compare(lastIP, newNode);
// If it compares equal, put the new node immediately after the
// last in the sequence of equals that contains it. This is a
// common fast path when we're adding many nodes that compare equal.
if (comparison == 0) {
lastIP = findLastOfEqualsFromLastIP(lastIP);
NodeTraits::setNext(newNode, NodeTraits::getNext(lastIP));
NodeTraits::setNext(lastIP, newNode);
setLastInsertionPoint(newNode, /*known last of equals*/ true);
return;
// If the new node must follow the last insertion node, we can
// at least start the search there.
} else if (comparison < 0) {
lastIP = findLastOfEqualsFromLastIP(lastIP);
prev = lastIP;
cur = NodeTraits::getNext(lastIP);
// Otherwise, we can at least end the search at the last inserted
// node.
} else {
stopper = lastIP;
}
}
// Invariants:
// root == [ ..., prev, cur, ... ]
// prev <= newRoot
// Scan forward looking for either `end` or a node that strictly
// follows the new node.
while (cur != stopper && NodeTraits::compare(cur, newNode) <= 0) {
prev = cur;
cur = NodeTraits::getNext(cur);
}
NodeTraits::setNext(newNode, cur);
if (prev) {
NodeTraits::setNext(prev, newNode);
} else {
root = newNode;
}
setLastInsertionPoint(newNode, /*known last of equals*/ true);
}
/// Add a single node to this merger's current list.
///
/// The next reference of the node will be overwritten and does not
/// need to be meaningful.
///
/// The relative order of nodes in the current list will not change,
/// and if there are nodes in the current list which compare equal
/// to the new node, it will be inserted *before* them.
///
/// This is useful for the pattern where nodes are naturally encountered
/// in the opposite of their desired order in the final list and
/// need to be reversed. It generally doesn't make any sense to mix
/// this with calls to insert or merge on the same merger.
void insertAtFront(Node newNode) {
assert(newNode && "inserting a null node");
auto insertBetween = [newNode, this](Node prev, Node next) {
if (prev) {
assert(NodeTraits::getNext(prev) == next);
assert(NodeTraits::compare(prev, newNode) < 0);
NodeTraits::setNext(prev, newNode);
} else {
assert(root == next);
root = newNode;
}
assert(!next || NodeTraits::compare(newNode, next) <= 0);
NodeTraits::setNext(newNode, next);
setLastInsertionPoint(prev, /*known last of equals*/ true);
};
Node prev = Node();
Node cur = root;
// If we have a previous insertion point, check for the presumed-common
// case that we're inserting something that should immediately follow it.
if (auto lastIP = lastInsertionPoint) {
lastIP = findLastOfEqualsFromLastIP(lastIP);
// Compare against the next node after lastIP, if it exists.
if (Node nextAfterLastIP = NodeTraits::getNext(lastIP)) {
int comparison = NodeTraits::compare(nextAfterLastIP, newNode);
// If the new node compares equal to the next node, insert here.
if (comparison == 0) {
insertBetween(lastIP, nextAfterLastIP);
return;
}
// If the new node should follow the next node, start scanning
// after it.
if (comparison < 0) {
prev = nextAfterLastIP;
cur = NodeTraits::getNext(nextAfterLastIP);
}
// Otherwise, we'll need to scan from the beginning.
// If there is no next node, compare against the previous.
} else {
int comparison = NodeTraits::compare(lastIP, newNode);
// If the new node should follow the last node, we can
// insert here.
if (comparison < 0) {
insertBetween(lastIP, Node());
return;
}
// Otherwise, we'll need to scan from the beginning.
}
}
assert(!prev || NodeTraits::compare(prev, newNode) < 0);
// Scan forward, looking for a node which the new node must be
// inserted prior to.
// Invariant: prev < newNode, if prev exists
while (cur) {
// Compare the new node against the current IP.
int comparison = NodeTraits::compare(cur, newNode);
// If the new node isn't strictly greater than cur, insert here.
if (comparison >= 0) break;
// Otherwise, continue.
prev = cur;
cur = NodeTraits::getNext(prev);
}
insertBetween(prev, cur);
}
/// Add a sorted list of nodes to this merger's current list.
/// The list must be well-formed (i.e. appropriately terminated).
///
/// The relative order of nodes in both the current and the new list
/// will not change. If there are nodes in the current list which
/// compare equal to nodes in the new list, they will appear before
/// the new nodes.
///
/// For example, if the current list is `[1@A, 1@B, 2@C]`, and the new
/// list is `[0@D, 1@E, 2@F]`, the current list after the merge will
/// be `[0@D, 1@A, 1@B, 1@E, 2@C, 2@F]`.
void merge(Node rootOfNewList) {
if (!rootOfNewList) return;
Node prev = Node();
Node cur = root;
Node stopper = Node();
// If we have a previous insertion point, compare the new root
// against it.
if (Node lastIP = lastInsertionPoint) {
int comparison = NodeTraits::compare(lastIP, rootOfNewList);
// If it compares equal, we've got an insertion point where
// we can place rootOfNewList: the end of the sequence of
// equals that includes lastIP. This is a common fast path
// when we have many nodes that compare equal.
if (comparison == 0) {
lastIP = findLastOfEqualsFromLastIP(lastIP);
prev = lastIP;
cur = NodeTraits::getNext(lastIP);
goto foundInsertionPoint; // seems to be the best option
// If the new node must follow the last insertion point, we can
// at least start the search there.
} else if (comparison < 0) {
lastIP = findLastOfEqualsFromLastIP(lastIP);
prev = lastIP;
cur = NodeTraits::getNext(lastIP);
// Otherwise, we can end the initial search at that position.
} else {
stopper = lastIP;
}
}
while (rootOfNewList) {
// Invariants:
// root == [ ..., prev, cur, ... ]
// prev <= rootOfNewList
// Check if the position between prev and cur is where we should
// insert the root of the new list.
if (cur != stopper && NodeTraits::compare(cur, rootOfNewList) <= 0) {
prev = cur;
cur = NodeTraits::getNext(cur);
continue;
}
// Place rootOfNewList at this position. Note that this might not be
// a proper splice because there may be nodes following prev that
// are now no longer reflected in the existing list.
if (!prev) {
root = rootOfNewList;
} else {
foundInsertionPoint:
NodeTraits::setNext(prev, rootOfNewList);
}
// If we've run out of nodes in the existing list, it *is*
// a proper splice, and we're done.
if (!cur) {
assert(!stopper);
setLastInsertionPoint(rootOfNewList, /*known end of equals*/ false);
return;
}
// If not, scan forward in the new list looking for a node that
// cur should precede.
Node prevInNewList = rootOfNewList;
Node curInNewList = NodeTraits::getNext(rootOfNewList);
while (curInNewList && NodeTraits::compare(cur, curInNewList) > 0) {
prevInNewList = curInNewList;
curInNewList = NodeTraits::getNext(curInNewList);
}
// prevInNewList < cur <= curInNewList (if it exists)
// Turn this:
// root == [ ..., prev, cur, ... ]
// rootOfNewList == [ ..., prevInNewList, curInNewList, ... ]
// into:
// root == [ ..., prev, rootOfNewList, ..., prevInNewList,
// cur, ... ]
// rootOfNewList' == [ curInNewList, ... ]
//
// Note that the next insertion point we'll check is *after* cur,
// since we know that cur <= curInNewList.
NodeTraits::setNext(prevInNewList, cur);
rootOfNewList = curInNewList;
prev = cur;
cur = NodeTraits::getNext(cur);
setLastInsertionPoint(prevInNewList, /*known end of equals*/ true);
// Any stopper we have was only known to exceed the original root
// node of the new list, which we've now inserted. From now on,
// we'll need to scan to the end of the list.
stopper = Node();
}
}
/// Get the current list that's been built up, and clear the internal
/// state of this merger.
Node release() {
Node result = root;
root = Node();
lastInsertionPoint = Node();
return result;
}
private:
/// Set the last point at which we inserted a node, and specify
/// whether we know it was the last in its sequence of equals.
void setLastInsertionPoint(Node lastIP, bool knownEndOfEquals) {
lastInsertionPoint = lastIP;
lastInsertionPointIsKnownLastOfEquals = knownEndOfEquals;
}
/// Given the value of lastInsertionPoint (passed in to avoid
/// reloading it), find the last node in the sequence of equals that
/// contains it.
Node findLastOfEqualsFromLastIP(Node lastIP) const {
assert(lastIP == lastInsertionPoint);
if (!lastInsertionPointIsKnownLastOfEquals)
return findLastOfEquals(lastIP);
return lastIP;
}
/// Find the last node in the sequence of equals that contains `node`.
static Node findLastOfEquals(Node node) {
while (Node next = NodeTraits::getNext(node)) {
int comparison = NodeTraits::compare(node, next);
assert(comparison <= 0 && "list is out of order");
if (comparison < 0) break;
node = next;
}
return node;
}
};
} // end namespace swift
#endif

View File

@@ -0,0 +1,153 @@
//===--- PriorityQueue.h - Merging sorted linked lists ----------*- C++ -*-===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
//
// This file defines a class that helps with maintaining and merging a
// sorted linked list.
//
//===----------------------------------------------------------------------===//
#ifndef SWIFT_BASIC_PRIORITYQUEUE_H
#define SWIFT_BASIC_PRIORITYQUEUE_H
#include <cassert>
namespace swift {
/// A class for priority FIFO queue with a fixed number of priorities.
///
/// The `Node` type parameter represents a reference to a list node.
/// Conceptually, a `Node` value is either null or a reference to an
/// object with an abstract sort value and a `next` reference
/// (another `Node` value).
///
/// A null reference can be created by explicitly default-constructing
/// the `Node` type, e.g. with `Node()`. Converting a `Node` value
/// contextually to `bool` tests whether the node is a null reference.
/// `Node` values can be compared with the `==` and `!=` operators,
/// and equality with `Node()` is equivalent to a `bool` conversion.
/// These conditions are designed to allow pointer types to be used
/// directly, but they also permit other types. `ListMerger` is not
/// currently written to support smart pointer types efficiently,
/// however.
///
/// The sort value and `next` reference are not accessed directly;
/// instead, they are accessed with `static` functions on the
/// `NodeTraits` type parameter:
///
/// ```
/// /// Return the current value of the next reference.
/// static Node getNext(Node n);
///
/// /// Set the current value of the next reference.
/// static void setNext(Node n, Node next);
///
/// /// Total number of priority buckets.
/// enum { prioritiesCount = ... };
///
/// /// Returns priority of the Node as value between 0 and `prioritiesCount-1`.
/// /// Smaller indices have higher priority.
/// static int getPriorityIndex(Node);
/// ```
///
/// All nodes are stored in a single linked list, sorted by priority.
/// Within the same priority jobs are sorted in the FIFO order.
///
template <class Node, class NodeTraits>
class PriorityQueue {
private:
/// Head of the linked list.
Node head;
/// Last node of the corresponding priority, or null if no nodes of that
/// priority exist.
Node tails[NodeTraits::prioritiesCount];
public:
PriorityQueue() : head(), tails{} {}
/// Add a single node to this queue.
///
/// The next reference of the node will be overwritten and does not
/// need to be meaningful.
///
/// The relative order of the existing nodes in the queue will not change,
/// and if there are nodes in the current list which compare equal
/// to the new node, it will be inserted after them.
void enqueue(Node newNode) {
assert(newNode && "inserting a null node");
int priorityIndex = NodeTraits::getPriorityIndex(newNode);
enqueueRun(priorityIndex, newNode, newNode);
}
/// Add a chain of nodes of mixed priorities to this queue.
void enqueueContentsOf(Node otherHead) {
if (!otherHead) return;
Node runHead = otherHead;
int priorityIndex = NodeTraits::getPriorityIndex(runHead);
do {
// Find run of jobs of the same priority
Node runTail = runHead;
Node next = NodeTraits::getNext(runTail);
int nextRunPriorityIndex = badIndex;
while (next) {
nextRunPriorityIndex = NodeTraits::getPriorityIndex(next);
if (nextRunPriorityIndex != priorityIndex) break;
runTail = next;
next = NodeTraits::getNext(runTail);
}
enqueueRun(priorityIndex, runHead, runTail);
runHead = next;
priorityIndex = nextRunPriorityIndex;
} while(runHead);
}
Node dequeue() {
if (!head) {
return head;
}
auto result = head;
int resultIndex = NodeTraits::getPriorityIndex(result);
head = NodeTraits::getNext(result);
if (!head || resultIndex != NodeTraits::getPriorityIndex(head)) {
tails[resultIndex] = Node();
}
return result;
}
Node peek() const { return head; }
bool empty() const { return !head; }
private:
// Use large negative value to increase chance of causing segfault if this
// value ends up being used for indexing. Using -1 would cause accessing
// `head`, which is less noticeable.
static const int badIndex = std::numeric_limits<int>::min();
void enqueueRun(int priorityIndex, Node runHead, Node runTail) {
for (int i = priorityIndex;; i--) {
if (i < 0) {
NodeTraits::setNext(runTail, head);
head = runHead;
break;
}
if (tails[i]) {
NodeTraits::setNext(runTail, NodeTraits::getNext(tails[i]));
NodeTraits::setNext(tails[i], runHead);
break;
}
}
tails[priorityIndex] = runTail;
}
};
} // end namespace swift
#endif

View File

@@ -23,7 +23,8 @@
#include "swift/ABI/Actor.h"
#include "swift/ABI/Task.h"
#include "TaskPrivate.h"
#include "swift/Basic/ListMerger.h"
#include "swift/Basic/HeaderFooterLayout.h"
#include "swift/Basic/PriorityQueue.h"
#include "swift/Concurrency/Actor.h"
#include "swift/Runtime/AccessibleFunction.h"
#include "swift/Runtime/Atomic.h"
@@ -663,66 +664,6 @@ public:
}
};
class JobRef {
enum : uintptr_t {
NeedsPreprocessing = 0x1,
JobMask = ~uintptr_t(NeedsPreprocessing)
};
/// A Job* that may have one of the two bits above mangled into it.
uintptr_t Value;
JobRef(Job *job, unsigned flags)
: Value(reinterpret_cast<uintptr_t>(job) | flags) {}
public:
constexpr JobRef() : Value(0) {}
/// Return a reference to a job that's been properly preprocessed.
static JobRef getPreprocessed(Job *job) {
/// We allow null pointers here.
return { job, 0 };
}
/// Return a reference to a job that hasn't been preprocessed yet.
static JobRef getUnpreprocessed(Job *job) {
assert(job && "passing a null job");
return { job, NeedsPreprocessing };
}
/// Is this a null reference?
operator bool() const { return Value != 0; }
/// Does this job need to be pre-processed before we can treat
/// the job queue as a proper queue?
bool needsPreprocessing() const {
return Value & NeedsPreprocessing;
}
/// Is this an unprocessed message to the actor, rather than a job?
bool isMessage() const {
return false; // For now, we have no messages
}
Job *getAsJob() const {
assert(!isMessage());
return reinterpret_cast<Job*>(Value & JobMask);
}
Job *getAsPreprocessedJob() const {
assert(!isMessage() && !needsPreprocessing());
return reinterpret_cast<Job*>(Value);
}
/// Get the Job pointer with no preconditions on its type, for tracing.
Job *getRawJob() const { return reinterpret_cast<Job *>(Value & JobMask); }
bool operator==(JobRef other) const {
return Value == other.Value;
}
bool operator!=(JobRef other) const {
return Value != other.Value;
}
};
/// Similar to the ActiveTaskStatus, this denotes the ActiveActorState for
/// tracking the atomic state of the actor
///
@@ -735,16 +676,17 @@ public:
/// * Pointer to list of jobs enqueued in actor
///
/// It is important for all of this information to be in the same atomic so that
/// when the actor's state changes, the information is visible to all threads that
/// may be modifying the actor, allowing the algorithm to eventually converge.
/// when the actor's state changes, the information is visible to all threads
/// that may be modifying the actor, allowing the algorithm to eventually
/// converge.
///
/// In order to provide priority escalation support with actors, deeper integration is
/// required with the OS in order to have the intended side effects. On Darwin, Swift
/// Concurrency Tasks runs on dispatch's queues. As such, we need to use an
/// encoding of thread identity vended by libdispatch called dispatch_lock_t,
/// and a futex-style dispatch API in order to escalate the priority of a
/// thread. Henceforth, the dispatch_lock_t tracked in the ActiveActorStatus
/// will be called the DrainLock.
/// In order to provide priority escalation support with actors, deeper
/// integration is required with the OS in order to have the intended side
/// effects. On Darwin, Swift Concurrency Tasks runs on dispatch's queues. As
/// such, we need to use an encoding of thread identity vended by libdispatch
/// called dispatch_lock_t, and a futex-style dispatch API in order to escalate
/// the priority of a thread. Henceforth, the dispatch_lock_t tracked in the
/// ActiveActorStatus will be called the DrainLock.
///
/// When a thread starts running on an actor, it's identity is recorded in the
/// ActiveActorStatus. This way, if a higher priority job is enqueued behind the
@@ -760,25 +702,25 @@ public:
///
/// 32 bit systems with SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION=1
///
/// Flags Drain Lock Unused JobRef
/// Flags Drain Lock Unused Job*
/// |----------------------|----------------------|----------------------|-------------------|
/// 32 bits 32 bits 32 bits 32 bits
///
/// 64 bit systems with SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION=1
///
/// Flags Drain Lock JobRef
/// Flags Drain Lock Job*
/// |----------------------|-------------------|----------------------|
/// 32 bits 32 bits 64 bits
///
/// 32 bit systems with SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION=0
///
/// Flags JobRef
/// Flags Job*
/// |----------------------|----------------------|
/// 32 bits 32 bits
//
/// 64 bit systems with SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION=0
///
/// Flags Unused JobRef
/// Flags Unused Job*
/// |----------------------|----------------------|---------------------|
/// 32 bits 32 bits 64 bits
///
@@ -816,14 +758,13 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
uint32_t Flags;
LLVM_ATTRIBUTE_UNUSED uint32_t Unused = {};
#endif
JobRef FirstJob;
Job *FirstJob;
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
ActiveActorStatus(uint32_t flags, dispatch_lock_t drainLockValue, JobRef job)
: Flags(flags), DrainLock(drainLockValue), FirstJob(job) {}
ActiveActorStatus(uint32_t flags, dispatch_lock_t drainLockValue, Job *job)
: Flags(flags), DrainLock(drainLockValue), FirstJob(job) {}
#else
ActiveActorStatus(uint32_t flags, JobRef job)
: Flags(flags), FirstJob(job) {}
ActiveActorStatus(uint32_t flags, Job *job) : Flags(flags), FirstJob(job) {}
#endif
uint32_t getActorState() const {
@@ -844,10 +785,9 @@ public:
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
constexpr ActiveActorStatus()
: Flags(), DrainLock(DLOCK_OWNER_NULL), FirstJob(JobRef()) {}
: Flags(), DrainLock(DLOCK_OWNER_NULL), FirstJob(nullptr) {}
#else
constexpr ActiveActorStatus()
: Flags(), FirstJob(JobRef()) {}
constexpr ActiveActorStatus() : Flags(), FirstJob(nullptr) {}
#endif
bool isIdle() const {
@@ -974,10 +914,8 @@ public:
#endif
}
JobRef getFirstJob() const {
return FirstJob;
}
ActiveActorStatus withFirstJob(JobRef firstJob) const {
Job *getFirstUnprioritisedJob() const { return FirstJob; }
ActiveActorStatus withFirstUnprioritisedJob(Job *firstJob) const {
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
return ActiveActorStatus(Flags, DrainLock, firstJob);
#else
@@ -1022,12 +960,34 @@ public:
break;
}
concurrency::trace::actor_state_changed(
actor, getFirstJob().getRawJob(), getFirstJob().needsPreprocessing(),
traceState, distributedActorIsRemote,
actor, getFirstUnprioritisedJob(), traceState, distributedActorIsRemote,
isMaxPriorityEscalated(), static_cast<uint8_t>(getMaxPriority()));
}
};
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
/// Given that a job is enqueued normally on a default actor, get/set
/// the next job in the actor's queue.
static Job *getNextJob(Job *job) {
return *reinterpret_cast<Job **>(job->SchedulerPrivate);
}
static void setNextJob(Job *job, Job *next) {
*reinterpret_cast<Job **>(job->SchedulerPrivate) = next;
}
struct JobQueueTraits {
static Job *getNext(Job *job) { return getNextJob(job); }
static void setNext(Job *job, Job *next) { setNextJob(job, next); }
enum { prioritiesCount = PriorityBucketCount };
static int getPriorityIndex(Job *job) {
return getPriorityBucketIndex(job->getPriority());
}
};
#endif
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES
#define ACTIVE_ACTOR_STATUS_SIZE (4 * (sizeof(uintptr_t)))
#else
@@ -1037,6 +997,45 @@ static_assert(sizeof(ActiveActorStatus) == ACTIVE_ACTOR_STATUS_SIZE,
"ActiveActorStatus is of incorrect size");
#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */
class DefaultActorImplHeader : public HeapObject {
protected:
#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
// If actors are locks, we don't need to maintain any extra bookkeeping in the
// ActiveActorStatus since all threads which are contending will block
// synchronously, no job queue is needed and the lock will handle all priority
// escalation logic
Mutex drainLock;
#else
// Note: There is some padding that is added here by the compiler in order to
// enforce alignment. This is space that is available for us to use in
// the future
alignas(sizeof(ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)];
#endif
// TODO (rokhinip): Make this a flagset
bool isDistributedRemoteActor;
};
// All the fields accessed under the actor's lock should be moved
// to the end of the default-actor reservation to minimize false sharing.
// The memory following the DefaultActorImpl object are the stored properties of
// the actor, which are all accessed only by the current processing thread.
class DefaultActorImplFooter {
protected:
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
using PriorityQueue = swift::PriorityQueue<Job *, JobQueueTraits>;
// When enqueued, jobs are atomically added to a linked list with the head
// stored inside ActiveActorStatus. This list contains jobs in the LIFO order
// regardless of their priorities.
//
// When the processing thread sees new incoming jobs in
// ActiveActorStatus, it reverses them and inserts them into
// prioritizedJobs in the appropriate priority bucket.
//
PriorityQueue prioritizedJobs;
#endif
};
/// The default actor implementation.
///
/// Ownership of the actor is subtle. Jobs are assumed to keep the actor
@@ -1087,22 +1086,10 @@ static_assert(sizeof(ActiveActorStatus) == ACTIVE_ACTOR_STATUS_SIZE,
/// processing job for an actor at a given time. Stealers jobs support does not
/// exist yet. As a result, the subset of rules that currently apply
/// are (1), (3), (5), (6).
class DefaultActorImpl : public HeapObject {
#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
// If actors are locks, we don't need to maintain any extra bookkeeping in the
// ActiveActorStatus since all threads which are contending will block
// synchronously, no job queue is needed and the lock will handle all priority
// escalation logic
Mutex drainLock;
#else
// Note: There is some padding that is added here by the compiler in order to
// enforce alignment. This is space that is available for us to use in
// the future
alignas(sizeof(ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)];
#endif
// TODO (rokhinip): Make this a flagset
bool isDistributedRemoteActor;
class DefaultActorImpl
: public HeaderFooterLayout<DefaultActorImplHeader, DefaultActorImplFooter,
sizeof(HeapObject) +
sizeof(void *) * NumWords_DefaultActor> {
public:
/// Properly construct an actor, except for the heap header.
void initialize(bool isDistributedRemote = false) {
@@ -1111,6 +1098,7 @@ public:
new (&this->drainLock) Mutex();
#else
_status().store(ActiveActorStatus(), std::memory_order_relaxed);
new (&this->prioritizedJobs) PriorityQueue();
#endif
SWIFT_TASK_DEBUG_LOG("Creating default actor %p", this);
concurrency::trace::actor_create(this);
@@ -1138,8 +1126,13 @@ public:
/// new priority
void enqueueStealer(Job *job, JobPriority priority);
// The calling thread must be holding the actor lock while calling this
/// Dequeues one job from `prioritisedJobs`.
/// The calling thread must be holding the actor lock while calling this
Job *drainOne();
/// Atomically claims incoming jobs from ActiveActorStatus, and calls `handleUnprioritizedJobs()`.
/// Called with actor lock held on current thread.
void processIncomingQueue();
#endif
/// Check if the actor is actually a distributed *remote* actor.
@@ -1150,11 +1143,11 @@ public:
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
swift::atomic<ActiveActorStatus> &_status() {
return reinterpret_cast<swift::atomic<ActiveActorStatus>&> (this->StatusStorage);
return reinterpret_cast<swift::atomic<ActiveActorStatus> &>(this->StatusStorage);
}
const swift::atomic<ActiveActorStatus> &_status() const {
return reinterpret_cast<const swift::atomic<ActiveActorStatus>&> (this->StatusStorage);
return reinterpret_cast<const swift::atomic<ActiveActorStatus> &>(this->StatusStorage);
}
// Only for static assert use below, not for actual use otherwise
@@ -1175,6 +1168,11 @@ private:
/// It can be done when actor transitions from Idle to Scheduled or
/// when actor gets a priority override and we schedule a stealer.
void scheduleActorProcessJob(JobPriority priority);
/// Processes claimed incoming jobs into `prioritizedJobs`.
/// Incoming jobs are of mixed priorities and in LIFO order.
/// Called with actor lock held on current thread.
void handleUnprioritizedJobs(Job *head);
#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */
void deallocateUnconditional();
@@ -1250,135 +1248,10 @@ static NonDefaultDistributedActorImpl *asImpl(NonDefaultDistributedActor *actor)
/*****************************************************************************/
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
/// Given that a job is enqueued normally on a default actor, get/set
/// the next job in the actor's queue.
static JobRef getNextJobInQueue(Job *job) {
return *reinterpret_cast<JobRef*>(job->SchedulerPrivate);
}
static void setNextJobInQueue(Job *job, JobRef next) {
*reinterpret_cast<JobRef*>(job->SchedulerPrivate) = next;
}
namespace {
struct JobQueueTraits {
static Job *getNext(Job *job) {
return getNextJobInQueue(job).getAsPreprocessedJob();
}
static void setNext(Job *job, Job *next) {
setNextJobInQueue(job, JobRef::getPreprocessed(next));
}
static int compare(Job *lhs, Job *rhs) {
return descendingPriorityOrder(lhs->getPriority(), rhs->getPriority());
}
};
} // end anonymous namespace
// Called with the actor drain lock held
//
// This function is called when we hit a conflict between preprocessQueue and
// a concurrent enqueuer resulting in unprocessed jobs being queued up in the
// middle.
//
// We need to find the unprocessed jobs enqueued by the enqueuer and process
// them - We know that these unprocessed jobs must exist between the new head
// and the previous start. We can then process these jobs and merge them into
// the already processed list of jobs from the previous iteration of
// preprocessQueue
static Job *
preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd, Job *existingProcessedJobsToMergeInto)
{
assert(existingProcessedJobsToMergeInto != NULL);
assert(unprocessedStart.needsPreprocessing());
assert(unprocessedStart.getAsJob() != unprocessedEnd.getAsJob());
// Build up a list of jobs we need to preprocess
using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
ListMerger jobsToProcess;
// Get just the prefix list of unprocessed jobs
auto current = unprocessedStart;
while (current != unprocessedEnd) {
assert(current.needsPreprocessing());
// Advance current to next pointer and process current unprocessed job
auto job = current.getAsJob();
current = getNextJobInQueue(job);
jobsToProcess.insertAtFront(job);
}
// Finish processing the unprocessed jobs
Job *newProcessedJobs = jobsToProcess.release();
assert(newProcessedJobs);
ListMerger mergedList(existingProcessedJobsToMergeInto);
mergedList.merge(newProcessedJobs);
return mergedList.release();
}
// Called with the actor drain lock held.
//
// Preprocess the queue starting from the top
static Job *
preprocessQueue(JobRef start) {
if (!start) {
return NULL;
}
// Entire queue is well formed, no pre-processing needed
if (!start.needsPreprocessing()) {
return start.getAsPreprocessedJob();
}
// There exist some jobs which haven't been preprocessed
// Build up a list of jobs we need to preprocess
using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
ListMerger jobsToProcess;
Job *wellFormedListStart = NULL;
auto current = start;
while (current) {
if (!current.needsPreprocessing()) {
// We can assume that everything from here onwards as being well formed
// and sorted
wellFormedListStart = current.getAsPreprocessedJob();
break;
}
// Advance current to next pointer and insert current fella to jobsToProcess
// list
auto job = current.getAsJob();
current = getNextJobInQueue(job);
jobsToProcess.insertAtFront(job);
}
// Finish processing the unprocessed jobs
auto processedJobHead = jobsToProcess.release();
assert(processedJobHead);
Job *firstJob = NULL;
if (wellFormedListStart) {
// Merge it with already known well formed list if we have one.
ListMerger mergedList(wellFormedListStart);
mergedList.merge(processedJobHead);
firstJob = mergedList.release();
} else {
// Nothing to merge with, just return the head we already have
firstJob = processedJobHead;
}
return firstJob;
}
static void traceJobQueue(DefaultActorImpl *actor, Job *first) {
concurrency::trace::actor_note_job_queue(actor, first, [](Job *job) {
return getNextJobInQueue(job).getAsPreprocessedJob();
});
concurrency::trace::actor_note_job_queue(
actor, first, [](Job *job) { return getNextJob(job); });
}
static SWIFT_ATTRIBUTE_ALWAYS_INLINE void traceActorStateTransition(DefaultActorImpl *actor,
@@ -1418,11 +1291,9 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
auto newState = oldState;
// Link this into the queue in the atomic state
JobRef currentHead = oldState.getFirstJob();
setNextJobInQueue(job, currentHead);
JobRef newHead = JobRef::getUnpreprocessed(job);
newState = newState.withFirstJob(newHead);
Job *currentHead = oldState.getFirstUnprioritisedJob();
setNextJob(job, currentHead);
newState = newState.withFirstUnprioritisedJob(job);
if (oldState.isIdle()) {
// Schedule the actor
@@ -1547,56 +1418,67 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {
}
// Called with actor lock held on current thread
Job * DefaultActorImpl::drainOne() {
SWIFT_TASK_DEBUG_LOG("Draining one job from default actor %p", this);
void DefaultActorImpl::processIncomingQueue() {
// Pairs with the store release in DefaultActorImpl::enqueue
bool distributedActorIsRemote = swift_distributed_actor_is_remote(this);
auto oldState = _status().load(SWIFT_MEMORY_ORDER_CONSUME);
_swift_tsan_consume(this);
auto jobToPreprocessFrom = oldState.getFirstJob();
Job *firstJob = preprocessQueue(jobToPreprocessFrom);
traceJobQueue(this, firstJob);
// We must ensure that any jobs not seen by collectJobs() don't have any
// dangling references to the jobs that have been collected. For that we must
// atomically set head pointer to NULL. If it fails because more jobs have
// been added in the meantime, we have to re-read the head pointer.
while (true) {
// If there aren't any new jobs in the incoming queue, we can return
// immediately without updating the status.
if (!oldState.getFirstUnprioritisedJob()) {
return;
}
assert(oldState.isAnyRunning());
if (!firstJob) {
// Nothing to drain, short circuit
SWIFT_TASK_DEBUG_LOG("No jobs to drain on actor %p", this);
return NULL;
}
auto newState = oldState;
// Dequeue the first job and set up a new head
newState = newState.withFirstJob(getNextJobInQueue(firstJob));
if (_status().compare_exchange_weak(oldState, newState,
/* success */ std::memory_order_relaxed,
/* failure */ std::memory_order_relaxed)) {
SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this);
traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);
concurrency::trace::actor_dequeue(this, firstJob);
return firstJob;
}
newState = newState.withFirstUnprioritisedJob(nullptr);
// We failed the weak cmpxchg spuriously, go through loop again.
if (oldState.getFirstJob().getAsJob() == jobToPreprocessFrom.getAsJob()) {
continue;
if (_status().compare_exchange_weak(
oldState, newState,
/* success */ std::memory_order_relaxed,
/* failure */ std::memory_order_relaxed)) {
SWIFT_TASK_DEBUG_LOG("Collected some jobs from actor %p", this);
traceActorStateTransition(this, oldState, newState,
distributedActorIsRemote);
break;
}
// There were new items concurrently added to the queue. We need to
// preprocess the newly added unprocessed items and merge them to the already
// preprocessed list.
//
// The newly merged items that need to be preprocessed, are between the head
// of the linked list, and the last job we did the previous preprocessQueue
// on
firstJob = preprocessQueue(oldState.getFirstJob(), jobToPreprocessFrom, firstJob);
jobToPreprocessFrom = oldState.getFirstJob();
traceJobQueue(this, firstJob);
}
handleUnprioritizedJobs(oldState.getFirstUnprioritisedJob());
}
// Called with actor lock held on current thread
void DefaultActorImpl::handleUnprioritizedJobs(Job *head) {
// Reverse jobs from LIFO to FIFO order
Job *reversed = nullptr;
while (head) {
auto next = getNextJob(head);
setNextJob(head, reversed);
reversed = head;
head = next;
}
prioritizedJobs.enqueueContentsOf(reversed);
}
// Called with actor lock held on current thread
Job *DefaultActorImpl::drainOne() {
SWIFT_TASK_DEBUG_LOG("Draining one job from default actor %p", this);
traceJobQueue(this, prioritizedJobs.peek());
auto firstJob = prioritizedJobs.dequeue();
if (!firstJob) {
SWIFT_TASK_DEBUG_LOG("No jobs to drain on actor %p", this);
} else {
SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this);
concurrency::trace::actor_dequeue(this, firstJob);
}
return firstJob;
}
// Called from processing jobs which are created to drain an actor. We need to
@@ -1651,39 +1533,40 @@ static void defaultActorDrain(DefaultActorImpl *actor) {
TaskExecutorRef::undefined());
while (true) {
Job *job = currentActor->drainOne();
if (job == NULL) {
// No work left to do, try unlocking the actor. This may fail if there is
// work concurrently enqueued in which case, we'd try again in the loop
if (currentActor->unlock(false)) {
break;
}
} else {
if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
auto taskExecutor = task->getPreferredTaskExecutor();
trackingInfo.setTaskExecutor(taskExecutor);
}
// This thread is now going to follow the task on this actor. It may hop off
// the actor
runJobInEstablishedExecutorContext(job);
// We could have come back from the job on a generic executor and not as
// part of a default actor. If so, there is no more work left for us to do
// here.
auto currentExecutor = trackingInfo.getActiveExecutor();
if (!currentExecutor.isDefaultActor()) {
currentActor = nullptr;
break;
}
currentActor = asImpl(currentExecutor.getDefaultActor());
}
if (shouldYieldThread()) {
currentActor->unlock(true);
break;
}
Job *job = currentActor->drainOne();
if (job == NULL) {
// No work left to do, try unlocking the actor. This may fail if there is
// work concurrently enqueued in which case, we'd try again in the loop
if (!currentActor->unlock(false)) {
continue;
}
break;
}
if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
auto taskExecutor = task->getPreferredTaskExecutor();
trackingInfo.setTaskExecutor(taskExecutor);
}
// This thread is now going to follow the task on this actor. It may hop off
// the actor
runJobInEstablishedExecutorContext(job);
// We could have come back from the job on a generic executor and not as
// part of a default actor. If so, there is no more work left for us to do
// here.
auto currentExecutor = trackingInfo.getActiveExecutor();
if (!currentExecutor.isDefaultActor()) {
currentActor = nullptr;
break;
}
currentActor = asImpl(currentExecutor.getDefaultActor());
currentActor->processIncomingQueue();
}
// Leave the tracking info.
@@ -1714,16 +1597,18 @@ void DefaultActorImpl::destroy() {
#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
// TODO (rokhinip): Do something to assert that the lock is unowned
#else
auto oldState = _status().load(std::memory_order_relaxed);
auto oldState = _status().load(std::memory_order_acquire);
// Tasks on an actor are supposed to keep the actor alive until they start
// running and we can only get here if ref count of the object = 0 which means
// there should be no more tasks enqueued on the actor.
assert(!oldState.getFirstJob() && "actor has queued jobs at destruction");
assert(!oldState.getFirstUnprioritisedJob() && "actor has queued jobs at destruction");
if (oldState.isIdle()) {
return;
assert(prioritizedJobs.empty() && "actor has queued jobs at destruction");
return;
}
assert(oldState.isRunning() && "actor scheduled but not running at destruction");
// In running state we cannot safely access prioritizedJobs to assert that it is empty.
#endif
}
@@ -1775,7 +1660,7 @@ retry:;
bool distributedActorIsRemote = swift_distributed_actor_is_remote(this);
auto oldState = _status().load(std::memory_order_relaxed);
while (true) {
bool assertNoJobs = false;
if (asDrainer) {
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
if (!oldState.isScheduled()) {
@@ -1816,7 +1701,10 @@ retry:;
}
assert(oldState.getMaxPriority() == JobPriority::Unspecified);
assert(!oldState.getFirstJob());
assert(!oldState.getFirstUnprioritisedJob());
// We cannot assert here that prioritizedJobs is empty,
// because lock is not held yet. Raise a flag to assert after getting the lock.
assertNoJobs = true;
}
// Taking the drain lock clears the max priority escalated bit because we've
@@ -1824,12 +1712,29 @@ retry:;
auto newState = oldState.withRunning();
newState = newState.withoutEscalatedPriority();
// Claim incoming jobs when obtaining lock as a drainer, to save one
// round of atomic load and compare-exchange.
// This is not useful when obtaining lock for assuming thread during actor
// switching, because arbitrary use code can run between locking and
// draining the next job. So we still need to call processIncomingQueue() to
// check for higher priority jobs that could have been scheduled in the
// meantime. And processing is more efficient when done in larger batches.
if (asDrainer) {
newState = newState.withFirstUnprioritisedJob(nullptr);
}
// This needs an acquire since we are taking a lock
if (_status().compare_exchange_weak(oldState, newState,
std::memory_order_acquire,
std::memory_order_relaxed)) {
_swift_tsan_acquire(this);
if (assertNoJobs) {
assert(prioritizedJobs.empty());
}
traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);
if (asDrainer) {
handleUnprioritizedJobs(oldState.getFirstUnprioritisedJob());
}
return true;
}
}
@@ -1877,7 +1782,8 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
}
auto newState = oldState;
if (oldState.getFirstJob()) {
// Lock is still held at this point, so it is safe to access prioritizedJobs
if (!prioritizedJobs.empty() || oldState.getFirstUnprioritisedJob()) {
// There is work left to do, don't unlock the actor
if (!forceUnlock) {
SWIFT_TASK_DEBUG_LOG("Unlock-ing actor %p failed", this);
@@ -1915,7 +1821,6 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
if (newState.isScheduled()) {
// See ownership rule (6) in DefaultActorImpl
assert(newState.getFirstJob());
scheduleActorProcessJob(newState.getMaxPriority());
} else {
// See ownership rule (5) in DefaultActorImpl

View File

@@ -28,7 +28,7 @@
# include <thread>
#endif
#include <errno.h>
#include "swift/Basic/ListMerger.h"
#include "swift/Basic/PriorityQueue.h"
#if __has_include(<time.h>)
# include <time.h>
@@ -50,11 +50,12 @@ struct JobQueueTraits {
static void setNext(Job *job, Job *next) {
storage(job) = next;
}
static int compare(Job *lhs, Job *rhs) {
return descendingPriorityOrder(lhs->getPriority(), rhs->getPriority());
enum { prioritiesCount = PriorityBucketCount };
static int getPriorityIndex(Job *job) {
return getPriorityBucketIndex(job->getPriority());
}
};
using JobQueueMerger = ListMerger<Job*, JobQueueTraits>;
using JobPriorityQueue = PriorityQueue<Job*, JobQueueTraits>;
using JobDeadline = std::chrono::time_point<std::chrono::steady_clock>;
@@ -98,17 +99,14 @@ struct JobDeadlineStorage<false> {
} // end anonymous namespace
static Job *JobQueue = nullptr;
static JobPriorityQueue JobQueue;
static Job *DelayedJobQueue = nullptr;
/// Insert a job into the cooperative global queue.
SWIFT_CC(swift)
static void swift_task_enqueueGlobalImpl(Job *job) {
assert(job && "no job provided");
JobQueueMerger merger(JobQueue);
merger.insert(job);
JobQueue = merger.release();
JobQueue.enqueue(job);
}
/// Enqueues a task on the main executor.
@@ -188,7 +186,6 @@ static void recognizeReadyDelayedJobs() {
if (!nextDelayedJob) return;
auto now = std::chrono::steady_clock::now();
JobQueueMerger readyJobs(JobQueue);
// Pull jobs off of the delayed-jobs queue whose deadline has been
// reached, and add them to the ready queue.
@@ -198,11 +195,9 @@ static void recognizeReadyDelayedJobs() {
JobDeadlineStorage<>::destroy(nextDelayedJob);
auto next = JobQueueTraits::getNext(nextDelayedJob);
readyJobs.insert(nextDelayedJob);
JobQueue.enqueue(nextDelayedJob);
nextDelayedJob = next;
}
JobQueue = readyJobs.release();
DelayedJobQueue = nextDelayedJob;
}
@@ -232,8 +227,7 @@ static Job *claimNextFromCooperativeGlobalQueue() {
recognizeReadyDelayedJobs();
// If there's a job in the primary queue, run it.
if (auto job = JobQueue) {
JobQueue = JobQueueTraits::getNext(job);
if (auto job = JobQueue.dequeue()) {
return job;
}

View File

@@ -47,8 +47,7 @@ void actor_dequeue(HeapObject *actor, Job *job);
// State values are:
// Idle = 0, Scheduled = 1, Running = 2, Zombie_ReadyForDeallocation = 3,
// invalid/unknown = 255
void actor_state_changed(HeapObject *actor, Job *firstJob,
bool needsPreprocessing, uint8_t state,
void actor_state_changed(HeapObject *actor, Job *firstJob, uint8_t state,
bool isDistributedRemote, bool isPriorityEscalated,
uint8_t maxPriority);

View File

@@ -143,8 +143,7 @@ inline void actor_dequeue(HeapObject *actor, Job *job) {
}
}
inline void actor_state_changed(HeapObject *actor, Job *firstJob,
bool needsPreprocessing, uint8_t state,
inline void actor_state_changed(HeapObject *actor, Job *firstJob, uint8_t state,
bool isDistributedRemote,
bool isPriorityEscalated, uint8_t maxPriority) {
ENSURE_LOGS();
@@ -153,8 +152,8 @@ inline void actor_state_changed(HeapObject *actor, Job *firstJob,
"actor=%p needsPreprocessing=%d "
"state=%u isDistributedRemote=%{bool}d "
"isPriorityEscalated=%{bool}d, maxPriority=%u",
actor, needsPreprocessing, state, isDistributedRemote,
isPriorityEscalated, maxPriority);
actor, (firstJob != nullptr), state,
isDistributedRemote, isPriorityEscalated, maxPriority);
}
inline void actor_note_job_queue(HeapObject *actor, Job *first,

View File

@@ -33,8 +33,7 @@ inline void actor_enqueue(HeapObject *actor, Job *job) {}
inline void actor_dequeue(HeapObject *actor, Job *job) {}
inline void actor_state_changed(HeapObject *actor, Job *firstJob,
bool needsPreprocessing, uint8_t state,
inline void actor_state_changed(HeapObject *actor, Job *firstJob, uint8_t state,
bool isDistributedRemote,
bool isPriorityEscalated, uint8_t maxPriority) {
}

View File

@@ -1,13 +0,0 @@
SWIFT_SRCROOT=${CURDIR}/../..
SRCROOT=${SWIFT_SRCROOT}/..
LLVM_SRCROOT=${SRCROOT}/llvm/
LLVM_OBJROOT=${SRCROOT}/build/Ninja-DebugAssert/llvm-macosx-x86_64
HEADERS=${SWIFT_SRCROOT}/include/swift/Basic/ListMerger.h
CXXFLAGS=-Wall -std=c++17 -stdlib=libc++ -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -I${OBJROOT}/include -I${SWIFT_SRCROOT}/include -I${LLVM_SRCROOT}/include -I${LLVM_OBJROOT}/include
TestListMerger: TestListMerger.o
$(CXX) -L${LLVM_OBJROOT}/lib -lLLVMSupport $< -o $@
TestListMerger.o: ${HEADERS}

View File

@@ -1,252 +0,0 @@
#include "swift/Basic/ListMerger.h"
#include "llvm/ADT/ArrayRef.h"
#include <vector>
#include <random>
#include <iostream>
using namespace swift;
static int compare_unsigned(unsigned lhs, unsigned rhs) {
return (lhs < rhs ? -1 : lhs > rhs ? 1 : 0);
}
namespace {
enum EntryOrder {
creationOrder,
reverseCreationOrder
};
struct Entry {
unsigned id;
unsigned value;
Entry *next;
};
class EntryFactory {
std::vector<std::unique_ptr<Entry>> entries;
unsigned nextID = 0;
public:
Entry *create(unsigned value) {
auto entry = new Entry{nextID++, value, nullptr};
entries.emplace_back(entry);
return entry;
}
/// Sort the entries in this list.
///
/// \param reverseCreationOrder - if true, then order equal-value
/// nodes in the reverse of creation order; otherwise
/// creator order of creation order
void sort(EntryOrder order) {
std::sort(entries.begin(), entries.end(),
[=](const std::unique_ptr<Entry> &lhs,
const std::unique_ptr<Entry> &rhs) {
if (lhs->value != rhs->value) return lhs->value < rhs->value;
return order == creationOrder
? lhs->id < rhs->id
: lhs->id > rhs->id;
});
}
void checkSameAs(Entry *list) {
for (auto &entry: entries) {
std::cout << " " << list->value << " (" << list->id << ")\n";
assert(list == entry.get());
list = list->next;
};
assert(list == nullptr);
}
};
struct EntryListTraits {
static Entry *getNext(Entry *e) { return e->next; }
static void setNext(Entry *e, Entry *next) { e->next = next; }
static int compare(Entry *lhs, Entry *rhs) {
return compare_unsigned(lhs->value, rhs->value);
}
};
using EntryListMerger = ListMerger<Entry*, EntryListTraits>;
enum Op {
insert,
beginMerge,
endMerge
};
/// An instruction to the test harness: either a simple value
/// (an "insert") or one of the special instructions.
struct Instruction {
Op op;
unsigned value;
Instruction(Op op) : op(op), value(0) { assert(op != insert); }
Instruction(unsigned value) : op(insert), value(value) {}
friend std::ostream &operator<<(std::ostream &str, const Instruction &inst) {
switch (inst.op) {
case insert: str << inst.value; break;
case beginMerge: str << "beginMerge"; break;
case endMerge: str << "endMerge"; break;
}
return str;
}
};
} // end anonymous namespace
template <class T>
static std::ostream &operator<<(std::ostream &str, llvm::ArrayRef<T> list) {
str << "{";
for (auto b = list.begin(), i = b, e = list.end(); i != e; ++i) {
if (i != b) str << ", ";
str << *i;
}
str << "}";
return str;
}
template <class T>
static std::ostream &operator<<(std::ostream &str, const std::vector<T> &list) {
return (str << llvm::ArrayRef(list));
}
static void runInsertAndMergeTest(llvm::ArrayRef<Instruction> values) {
EntryFactory entries;
EntryListMerger merger;
// Between beginMerge and endMerge instructions, values don't get
// inserted immediately: they build up into a separate list of items
// that will be merged at the time of the endMerge. We record this
// mode by making lastMergeEntry non-null.
Entry *firstMergeEntry;
Entry **lastMergeEntry = nullptr;
for (auto &inst : values) {
switch (inst.op) {
case insert: {
// Create the new entry.
Entry *entry = entries.create(inst.value);
// If we're building a merge list, append to the end of it.
if (lastMergeEntry) {
*lastMergeEntry = entry;
lastMergeEntry = &entry->next;
// Otherwise, just do an insertion.
} else {
merger.insert(entry);
}
break;
}
case beginMerge:
assert(!lastMergeEntry && "already building a merge list");
lastMergeEntry = &firstMergeEntry;
break;
case endMerge:
assert(lastMergeEntry && "not building a merge list?");
// Cap off the merge list we built.
*lastMergeEntry = nullptr;
// Do the merge.
merger.merge(firstMergeEntry);
// We're no longer building a merge list.
lastMergeEntry = nullptr;
break;
}
}
assert(!lastMergeEntry && "ended while still building a merge list");
entries.sort(creationOrder);
entries.checkSameAs(merger.release());
}
static void runInsertAtFrontTest(llvm::ArrayRef<unsigned> values) {
EntryFactory entries;
EntryListMerger merger;
for (auto value: values) {
merger.insertAtFront(entries.create(value));
}
entries.sort(reverseCreationOrder);
entries.checkSameAs(merger.release());
}
static void runConcreteTests() {
runInsertAndMergeTest({ 5, 0, 3, 0, 1, 0, 7 });
}
namespace {
struct TestConfig {
unsigned numTests;
unsigned numEntries;
unsigned maxValue;
};
}
static void runInsertAndMergeTests(const TestConfig &config) {
std::random_device randomDevice;
std::default_random_engine e(randomDevice());
std::uniform_int_distribution<unsigned> valueDist(0, config.maxValue);
// Chance of entering or exiting a merge.
std::uniform_int_distribution<unsigned> mergeDist(0, 20);
std::vector<Instruction> ins;
for (unsigned testN = 0; testN < config.numTests; ++testN) {
ins.clear();
const size_t noMerge = -1;
size_t mergeStart = noMerge;
for (unsigned i = 0; i < config.numEntries || mergeStart != noMerge; ++i) {
if (mergeDist(e) == 0) {
if (mergeStart != noMerge) {
std::sort(ins.begin() + mergeStart, ins.end(),
[](const Instruction &lhs, const Instruction &rhs) {
return lhs.value < rhs.value;
});
ins.push_back(endMerge);
mergeStart = noMerge;
} else {
ins.push_back(beginMerge);
mergeStart = ins.size();
}
} else {
ins.push_back(valueDist(e));
}
}
std::cout << "runInsertAndMergeTest(" << ins << ");" << std::endl;
runInsertAndMergeTest(ins);
}
}
static void runInsertAtFrontTests(const TestConfig &config) {
std::random_device randomDevice;
std::default_random_engine e(randomDevice());
std::uniform_int_distribution<unsigned> valueDist(0, config.maxValue);
std::vector<unsigned> ins;
for (unsigned testN = 0; testN < config.numTests; ++testN) {
ins.clear();
for (unsigned i = 0; i < config.numEntries; ++i) {
ins.push_back(valueDist(e));
}
std::cout << "runInsertAtFrontTest(" << ins << ");" << std::endl;
runInsertAtFrontTest(ins);
}
}
int main() {
TestConfig config = {
.numTests = 1000,
.numEntries = 2000,
.maxValue = 3
};
runConcreteTests();
runInsertAndMergeTests(config);
runInsertAtFrontTests(config);
}

View File

@@ -0,0 +1,11 @@
SWIFT_SRCROOT=${CURDIR}/../..
SRCROOT=${SWIFT_SRCROOT}/..
HEADERS=${SWIFT_SRCROOT}/include/swift/Basic/PriorityQueue.h
CXXFLAGS=-Wall -std=c++17 -stdlib=libc++ -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -I${OBJROOT}/include -I${SWIFT_SRCROOT}/include
TestPriorityQueue: TestPriorityQueue.o
$(CXX) $< -o $@
TestPriorityQueue.o: ${HEADERS}

View File

@@ -0,0 +1,323 @@
#include <cassert>
#include <iostream>
#include <random>
#include <vector>
#define private public
#include "swift/Basic/PriorityQueue.h"
using namespace swift;
namespace {
struct Entry {
unsigned id;
unsigned value;
Entry *next;
};
class EntryFactory {
std::vector<std::unique_ptr<Entry>> entries;
unsigned nextID = 0;
public:
Entry *create(unsigned value) {
auto entry = new Entry{nextID++, value, nullptr};
entries.emplace_back(entry);
return entry;
}
void remove(Entry *e) {
auto it = std::find_if(
entries.begin(), entries.end(),
[=](const std::unique_ptr<Entry> &ptr) { return ptr.get() == e; });
assert(it != entries.end());
entries.erase(it);
}
/// Sort the entries in this list.
///
/// \param reverseCreationOrder - if true, then order equal-value
/// nodes in the reverse of creation order; otherwise
/// creator order of creation order
void sort() {
std::sort(entries.begin(), entries.end(),
[=](const std::unique_ptr<Entry> &lhs,
const std::unique_ptr<Entry> &rhs) {
if (lhs->value != rhs->value)
return lhs->value < rhs->value;
return lhs->id < rhs->id;
});
}
void checkSameAs(Entry *list, unsigned line) {
std::cout << " <<< check " << line << std::endl;
for (auto &entry : entries) {
std::cout << " " << list->value << " (" << list->id << ")\n";
assert(list == entry.get());
list = list->next;
};
assert(list == nullptr);
}
};
struct EntryListTraits {
static Entry *getNext(Entry *e) { return e->next; }
static void setNext(Entry *e, Entry *next) { e->next = next; }
enum { prioritiesCount = 4 };
static int getPriorityIndex(Entry *e) {
assert(e->value < prioritiesCount);
return (int)e->value;
}
};
using EntryPriorityQueue = PriorityQueue<Entry *, EntryListTraits>;
struct ListBuilder {
Entry *head;
Entry **pTail;
ListBuilder(): head(), pTail(&head) {}
ListBuilder(ListBuilder const &) = delete;
ListBuilder(ListBuilder &&) = delete;
void append(Entry *e) {
*pTail = e;
e->next = nullptr;
pTail = &e->next;
}
Entry *take() {
Entry* result = head;
head = nullptr;
pTail = &head;
return result;
}
};
static void runEnqueueDequeueTest() {
std::cout << "runEnqueueDequeueTest()" << std::endl;
EntryFactory entries;
EntryPriorityQueue queue;
assert(!queue.head);
assert(!queue.tails[0]);
assert(!queue.tails[1]);
assert(!queue.tails[2]);
assert(!queue.tails[3]);
auto first = entries.create(3);
queue.enqueue(first);
assert(queue.head == first);
assert(!queue.tails[0]);
assert(!queue.tails[1]);
assert(!queue.tails[2]);
assert(queue.tails[3] == first);
auto second = entries.create(3);
queue.enqueue(second);
assert(queue.head == first);
assert(!queue.tails[0]);
assert(!queue.tails[1]);
assert(!queue.tails[2]);
assert(queue.tails[3] == second);
auto third = entries.create(1);
queue.enqueue(third);
assert(queue.head == third);
assert(!queue.tails[0]);
assert(queue.tails[1] == third);
assert(!queue.tails[2]);
assert(queue.tails[3] == second);
auto fourth = entries.create(2);
queue.enqueue(fourth);
assert(queue.head == third);
assert(!queue.tails[0]);
assert(queue.tails[1] == third);
assert(queue.tails[2] == fourth);
assert(queue.tails[3] == second);
entries.sort();
entries.checkSameAs(queue.head, __LINE__);
auto pop = [&](Entry *expected) {
auto e = queue.dequeue();
assert(e == expected);
entries.remove(e);
entries.sort();
entries.checkSameAs(queue.head, __LINE__);
};
pop(third);
assert(queue.head == fourth);
assert(!queue.tails[0]);
assert(!queue.tails[1]);
assert(queue.tails[2] == fourth);
assert(queue.tails[3] == second);
pop(fourth);
assert(queue.head == first);
assert(!queue.tails[0]);
assert(!queue.tails[1]);
assert(!queue.tails[2]);
assert(queue.tails[3] == second);
pop(first);
assert(queue.head == second);
assert(!queue.tails[0]);
assert(!queue.tails[1]);
assert(!queue.tails[2]);
assert(queue.tails[3] == second);
pop(second);
assert(!queue.head);
assert(!queue.tails[0]);
assert(!queue.tails[1]);
assert(!queue.tails[2]);
assert(!queue.tails[3]);
assert(!queue.dequeue());
assert(!queue.head);
assert(!queue.tails[0]);
assert(!queue.tails[1]);
assert(!queue.tails[2]);
assert(!queue.tails[3]);
}
static void runEnqueueContentsOfTest() {
std::cout << "runEnqueueContentsOfTest()" << std::endl;
EntryFactory entries;
EntryPriorityQueue queue;
ListBuilder builder;
assert(!queue.head);
assert(!queue.tails[0]);
assert(!queue.tails[1]);
assert(!queue.tails[2]);
assert(!queue.tails[3]);
queue.enqueueContentsOf(builder.take());
assert(!queue.head);
assert(!queue.tails[0]);
assert(!queue.tails[1]);
assert(!queue.tails[2]);
assert(!queue.tails[3]);
auto first = entries.create(3);
builder.append(first);
auto second = entries.create(3);
builder.append(second);
auto third = entries.create(1);
builder.append(third);
auto fourth = entries.create(2);
builder.append(fourth);
auto fifth = entries.create(2);
builder.append(fifth);
auto sixth = entries.create(1);
builder.append(sixth);
queue.enqueueContentsOf(builder.take());
assert(queue.head == third);
assert(!queue.tails[0]);
assert(queue.tails[1] == sixth);
assert(queue.tails[2] == fifth);
assert(queue.tails[3] == second);
entries.sort();
entries.checkSameAs(queue.head, __LINE__);
auto seventh = entries.create(0);
builder.append(seventh);
auto eighth = entries.create(0);
builder.append(eighth);
auto ninth = entries.create(0);
builder.append(ninth);
auto tenth = entries.create(3);
builder.append(tenth);
queue.enqueueContentsOf(builder.take());
assert(queue.head == seventh);
assert(queue.tails[0] == ninth);
assert(queue.tails[1] == sixth);
assert(queue.tails[2] == fifth);
assert(queue.tails[3] == tenth);
entries.sort();
entries.checkSameAs(queue.head, __LINE__);
}
static void runEnqueueDequeueTests(unsigned numTests, unsigned maxEntries) {
std::random_device randomDevice;
std::default_random_engine e(randomDevice());
std::uniform_int_distribution<unsigned> valueDist(
0, EntryListTraits::prioritiesCount - 1);
std::uniform_int_distribution<unsigned> numEntriesDist(0, maxEntries);
for (unsigned testN = 0; testN < numTests; ++testN) {
std::cout << "runEnqueueDequeueTests() #" << testN << std::endl;
EntryFactory entries;
EntryPriorityQueue queue;
unsigned numEntries = numEntriesDist(e);
std::cout << "numEntries = " << numEntries << std::endl;
for (unsigned i = 0; i < numEntries; ++i) {
auto value = valueDist(e);
std::cout << "-- " << value << std::endl;
queue.enqueue(entries.create(value));
}
entries.sort();
entries.checkSameAs(queue.head, __LINE__);
for (unsigned i = 0; i < numEntries; ++i) {
auto e = queue.dequeue();
std::cout << "pop " << e->value << std::endl;
entries.remove(e);
entries.sort();
entries.checkSameAs(queue.head, __LINE__);
}
}
}
static void runEnqueueContentsOfTests(unsigned numTests, unsigned maxChains, unsigned maxEntries) {
std::random_device randomDevice;
std::default_random_engine e(randomDevice());
std::uniform_int_distribution<unsigned> valueDist(
0, EntryListTraits::prioritiesCount - 1);
std::uniform_int_distribution<unsigned> numChainsDist(1, maxChains);
std::uniform_int_distribution<unsigned> numEntriesDist(0, maxEntries);
for (unsigned testN = 0; testN < numTests; ++testN) {
std::cout << "runEnqueueContentsOfTests() #" << testN << std::endl;
EntryFactory entries;
EntryPriorityQueue queue;
unsigned totalEntries = 0;
unsigned numChains = numChainsDist(e);
std::cout << "numChains = " << numChains << std::endl;
for (unsigned i = 0; i < numChains; ++i) {
unsigned numEntries = numEntriesDist(e);
std::cout << "numEntries = " << numEntries << std::endl;
totalEntries += numEntries;
ListBuilder builder;
for (unsigned j = 0; j < numEntries; ++j) {
auto value = valueDist(e);
std::cout << "-- " << value << std::endl;
builder.append(entries.create(value));
}
queue.enqueueContentsOf(builder.take());
}
entries.sort();
entries.checkSameAs(queue.head, __LINE__);
for (unsigned i = 0; i < totalEntries; ++i) {
auto e = queue.dequeue();
std::cout << "pop " << e->value << std::endl;
entries.remove(e);
entries.sort();
entries.checkSameAs(queue.head, __LINE__);
}
}
}
} // namespace
int main() {
runEnqueueDequeueTest();
runEnqueueContentsOfTest();
runEnqueueDequeueTests(1000, 20);
runEnqueueContentsOfTests(1000, 10, 20);
}