diff --git a/Sources/LanguageServerProtocol/Connection.swift b/Sources/LanguageServerProtocol/Connection.swift index 8d79a23b..2795b523 100644 --- a/Sources/LanguageServerProtocol/Connection.swift +++ b/Sources/LanguageServerProtocol/Connection.swift @@ -46,7 +46,7 @@ public protocol MessageHandler: AnyObject { /// The method should return as soon as the notification has been sufficiently /// handled to avoid out-of-order requests, e.g. once the notification has /// been forwarded to clangd. - func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) async + func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) /// Handle a request and (asynchronously) receive a reply. /// @@ -54,7 +54,7 @@ public protocol MessageHandler: AnyObject { /// handled to avoid out-of-order requests, e.g. once the corresponding /// request has been sent to sourcekitd. The actual semantic computation /// should occur after the method returns and report the result via `reply`. - func handle(_ params: Request, id: RequestID, from clientID: ObjectIdentifier, reply: @escaping (LSPResult) -> Void) async + func handle(_ params: Request, id: RequestID, from clientID: ObjectIdentifier, reply: @escaping (LSPResult) -> Void) } /// A connection between two message handlers in the same process. @@ -77,18 +77,7 @@ public final class LocalConnection { /// The queue guarding `_nextRequestID`. let queue: DispatchQueue = DispatchQueue(label: "local-connection-queue") - - /// The queue on which all messages (notifications, requests, responses) are - /// handled. - /// - /// The queue is blocked until the message has been sufficiently handled to - /// avoid out-of-order handling of messages. For sourcekitd, this means that - /// a request has been sent to sourcekitd and for clangd, this means that we - /// have forwarded the request to clangd. - /// - /// The actual semantic handling of the message happens off this queue. - let messageHandlingQueue: AsyncQueue = AsyncQueue(.serial) - + var _nextRequestID: Int = 0 var state: State = .ready @@ -125,9 +114,7 @@ public final class LocalConnection { extension LocalConnection: Connection { public func send(_ notification: Notification) where Notification: NotificationType { - messageHandlingQueue.async { - await self.handler?.handle(notification, from: ObjectIdentifier(self)) - } + self.handler?.handle(notification, from: ObjectIdentifier(self)) } public func send( @@ -137,19 +124,17 @@ extension LocalConnection: Connection { ) -> RequestID { let id = nextRequestID() - messageHandlingQueue.async { - guard let handler = self.handler else { - queue.async { - reply(.failure(.serverCancelled)) - } - return + guard let handler = self.handler else { + queue.async { + reply(.failure(.serverCancelled)) } + return id + } - precondition(self.state == .started) - await handler.handle(request, id: id, from: ObjectIdentifier(self)) { result in - queue.async { - reply(result) - } + precondition(self.state == .started) + handler.handle(request, id: id, from: ObjectIdentifier(self)) { result in + queue.async { + reply(result) } } diff --git a/Sources/LanguageServerProtocol/Message.swift b/Sources/LanguageServerProtocol/Message.swift index 6ee2291a..f4bbf4da 100644 --- a/Sources/LanguageServerProtocol/Message.swift +++ b/Sources/LanguageServerProtocol/Message.swift @@ -28,7 +28,7 @@ public protocol _RequestType: MessageType { id: RequestID, connection: Connection, reply: @escaping (LSPResult, RequestID) -> Void - ) async + ) } /// A request, which must have a unique `method` name as well as an associated response type. @@ -54,16 +54,16 @@ extension RequestType { id: RequestID, connection: Connection, reply: @escaping (LSPResult, RequestID) -> Void - ) async { - await handler.handle(self, id: id, from: ObjectIdentifier(connection)) { response in + ) { + handler.handle(self, id: id, from: ObjectIdentifier(connection)) { response in reply(response.map({ $0 as ResponseType }), id) } } } extension NotificationType { - public func _handle(_ handler: MessageHandler, connection: Connection) async { - await handler.handle(self, from: ObjectIdentifier(connection)) + public func _handle(_ handler: MessageHandler, connection: Connection) { + handler.handle(self, from: ObjectIdentifier(connection)) } } diff --git a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift index a204e49c..b299317a 100644 --- a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift +++ b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift @@ -31,16 +31,6 @@ public final class JSONRPCConnection { /// The queue on which we send data. let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated) - /// The queue on which all messages (notifications, requests, responses) are - /// handled. - /// - /// The queue is blocked until the message has been sufficiently handled to - /// avoid out-of-order handling of messages. For sourcekitd, this means that - /// a request has been sent to sourcekitd and for clangd, this means that we - /// have forwarded the request to clangd. - /// - /// The actual semantic handling of the message happens off this queue. - let messageHandlingQueue: AsyncQueue = AsyncQueue(.serial) let receiveIO: DispatchIO let sendIO: DispatchIO let messageRegistry: MessageRegistry @@ -282,17 +272,14 @@ public final class JSONRPCConnection { func handle(_ message: JSONRPCMessage) { switch message { case .notification(let notification): - messageHandlingQueue.async { - await notification._handle(self.receiveHandler!, connection: self) - } + notification._handle(self.receiveHandler!, connection: self) case .request(let request, id: let id): let semaphore: DispatchSemaphore? = syncRequests ? .init(value: 0) : nil - messageHandlingQueue.async { - await request._handle(self.receiveHandler!, id: id, connection: self) { (response, id) in - self.sendReply(response, id: id) - semaphore?.signal() - } + request._handle(self.receiveHandler!, id: id, connection: self) { (response, id) in + self.sendReply(response, id: id) + semaphore?.signal() } + semaphore?.wait() case .response(let response, id: let id): diff --git a/Sources/SKCore/BuildServerBuildSystem.swift b/Sources/SKCore/BuildServerBuildSystem.swift index 68659401..166d2cea 100644 --- a/Sources/SKCore/BuildServerBuildSystem.swift +++ b/Sources/SKCore/BuildServerBuildSystem.swift @@ -51,6 +51,17 @@ public actor BuildServerBuildSystem: MessageHandler { var buildServer: JSONRPCConnection? + /// The queue on which all messages that originate from the build server are + /// handled. + /// + /// These are requests and notifications sent *from* the build server, + /// not replies from the build server. + /// + /// This ensures that messages from the build server are handled in the order + /// they were received. Swift concurrency does not guarentee in-order + /// execution of tasks. + public let bspMessageHandlingQueue = AsyncQueue(.serial) + let searchPaths: [AbsolutePath] public private(set) var indexDatabasePath: AbsolutePath? @@ -167,18 +178,20 @@ public actor BuildServerBuildSystem: MessageHandler { /// the build server has sent us a notification. /// /// We need to notify the delegate about any updated build settings. - public func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) async { - if let params = params as? BuildTargetsChangedNotification { - await self.handleBuildTargetsChanged(Notification(params, clientID: clientID)) - } else if let params = params as? FileOptionsChangedNotification { - await self.handleFileOptionsChanged(Notification(params, clientID: clientID)) + public nonisolated func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) { + bspMessageHandlingQueue.async { + if let params = params as? BuildTargetsChangedNotification { + await self.handleBuildTargetsChanged(Notification(params, clientID: clientID)) + } else if let params = params as? FileOptionsChangedNotification { + await self.handleFileOptionsChanged(Notification(params, clientID: clientID)) + } } } /// Handler for requests received **from** the build server. /// /// We currently can't handle any requests sent from the build server to us. - public func handle( + public nonisolated func handle( _ params: R, id: RequestID, from clientID: ObjectIdentifier, diff --git a/Sources/SourceKitLSP/Clang/ClangLanguageServer.swift b/Sources/SourceKitLSP/Clang/ClangLanguageServer.swift index a4df45b1..790e3189 100644 --- a/Sources/SourceKitLSP/Clang/ClangLanguageServer.swift +++ b/Sources/SourceKitLSP/Clang/ClangLanguageServer.swift @@ -45,6 +45,19 @@ actor ClangLanguageServerShim: ToolchainLanguageServer, MessageHandler { /// The queue on which clangd calls us back. public let clangdCommunicationQueue: DispatchQueue = DispatchQueue(label: "language-server-queue", qos: .userInitiated) + /// The queue on which all messages that originate from clangd are handled. + /// + /// This includes requests and notifications sent *from* clangd and does not + /// include replies from clangd. + /// + /// These are requests and notifications sent *from* clangd, not replies from + /// clangd. + /// + /// Since we are blindly forwarding requests from clangd to the editor, we + /// cannot allow concurrent requests. This should be fine since the number of + /// requests and notifications sent from clangd to the client is quite small. + public let clangdMessageHandlingQueue = AsyncQueue(.serial) + /// The ``SourceKitServer`` instance that created this `ClangLanguageServerShim`. /// /// Used to send requests and notifications to the editor. @@ -255,14 +268,16 @@ actor ClangLanguageServerShim: ToolchainLanguageServer, MessageHandler { /// sending a notification that's intended for the editor. /// /// We should either handle it ourselves or forward it to the editor. - func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) async { - switch params { - case let publishDiags as PublishDiagnosticsNotification: - await self.publishDiagnostics(Notification(publishDiags, clientID: clientID)) - default: - // We don't know how to handle any other notifications and ignore them. - log("Ignoring unknown notification \(type(of: params))", level: .warning) - break + nonisolated func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) { + clangdMessageHandlingQueue.async { + switch params { + case let publishDiags as PublishDiagnosticsNotification: + await self.publishDiagnostics(Notification(publishDiags, clientID: clientID)) + default: + // We don't know how to handle any other notifications and ignore them. + log("Ignoring unknown notification \(type(of: params))", level: .warning) + break + } } } @@ -270,26 +285,24 @@ actor ClangLanguageServerShim: ToolchainLanguageServer, MessageHandler { /// sending a notification that's intended for the editor. /// /// We should either handle it ourselves or forward it to the client. - func handle( + nonisolated func handle( _ params: R, id: RequestID, from clientID: ObjectIdentifier, reply: @escaping (LSPResult) -> Void - ) async { - let request = Request(params, id: id, clientID: clientID, cancellation: CancellationToken(), reply: { result in - reply(result) - }) - guard let sourceKitServer else { - // `SourceKitServer` has been destructed. We are tearing down the language - // server. Nothing left to do. - request.reply(.failure(.unknown("Connection to the editor closed"))) - return - } + ) { + clangdMessageHandlingQueue.async { + let request = Request(params, id: id, clientID: clientID, cancellation: CancellationToken(), reply: { result in + reply(result) + }) + guard let sourceKitServer = await self.sourceKitServer else { + // `SourceKitServer` has been destructed. We are tearing down the language + // server. Nothing left to do. + request.reply(.failure(.unknown("Connection to the editor closed"))) + return + } - if request.clientID == ObjectIdentifier(self.clangd) { await sourceKitServer.sendRequestToClient(request.params, reply: request.reply) - } else { - request.reply(.failure(ResponseError.methodNotFound(R.method))) } } diff --git a/Sources/SourceKitLSP/SourceKitServer.swift b/Sources/SourceKitLSP/SourceKitServer.swift index 2694280f..90f68672 100644 --- a/Sources/SourceKitLSP/SourceKitServer.swift +++ b/Sources/SourceKitLSP/SourceKitServer.swift @@ -136,6 +136,16 @@ public actor SourceKitServer { /// The queue on which we communicate with the client. public let clientCommunicationQueue: DispatchQueue = DispatchQueue(label: "language-server-queue", qos: .userInitiated) + /// The queue on which all messages (notifications, requests, responses) are + /// handled. + /// + /// The queue is blocked until the message has been sufficiently handled to + /// avoid out-of-order handling of messages. For sourcekitd, this means that + /// a request has been sent to sourcekitd and for clangd, this means that we + /// have forwarded the request to clangd. + /// + /// The actual semantic handling of the message happens off this queue. + private let messageHandlingQueue = AsyncQueue(.concurrent) /// The connection to the editor. public let client: Connection @@ -442,116 +452,142 @@ public actor SourceKitServer { // MARK: - MessageHandler extension SourceKitServer: MessageHandler { - public func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) async { - let notification = Notification(params, clientID: clientID) - self._logNotification(notification) + public nonisolated func handle(_ params: some NotificationType, from clientID: ObjectIdentifier) { + // All of the notifications sourcekit-lsp currently handles might modify the + // global state (eg. whether a document is open or its contents) in a way + // that changes the results of requsts before and after. + // We thus need to ensure that we handle the notifications in order, so they + // need to be dispatch barriers. + // + // Technically, we could optimize this further by having an `AsyncQueue` for + // each file, because edits on one file should not block requests on another + // file from executing but, at least in Swift, this would get us any real + // benefits at the moment because sourcekitd only has a single, global queue, + // instead of a queue per file. + // Additionally, usually you are editing one file in a source editor, which + // means that concurrent requests to multiple files tend to be rare. + messageHandlingQueue.async(barrier: true) { + let notification = Notification(params, clientID: clientID) + await self._logNotification(notification) - switch notification { - case let notification as Notification: - self.clientInitialized(notification) - case let notification as Notification: - self.cancelRequest(notification) - case let notification as Notification: - await self.exit(notification) - case let notification as Notification: - await self.openDocument(notification) - case let notification as Notification: - await self.closeDocument(notification) - case let notification as Notification: - await self.changeDocument(notification) - case let notification as Notification: - await self.didChangeWorkspaceFolders(notification) - case let notification as Notification: - await self.didChangeWatchedFiles(notification) - case let notification as Notification: - await self.withLanguageServiceAndWorkspace(for: notification, notificationHandler: self.willSaveDocument) - case let notification as Notification: - await self.withLanguageServiceAndWorkspace(for: notification, notificationHandler: self.didSaveDocument) - default: - break - } - } - - public func handle(_ params: R, id: RequestID, from clientID: ObjectIdentifier, reply: @escaping (LSPResult) -> Void) async { - let cancellationToken = CancellationToken() - let request = Request(params, id: id, clientID: clientID, cancellation: cancellationToken, reply: { [weak self] result in - reply(result) - if let self { - Task { - await self._logResponse(result, id: id, method: R.method) - } + switch notification { + case let notification as Notification: + await self.clientInitialized(notification) + case let notification as Notification: + await self.cancelRequest(notification) + case let notification as Notification: + await self.exit(notification) + case let notification as Notification: + await self.openDocument(notification) + case let notification as Notification: + await self.closeDocument(notification) + case let notification as Notification: + await self.changeDocument(notification) + case let notification as Notification: + await self.didChangeWorkspaceFolders(notification) + case let notification as Notification: + await self.didChangeWatchedFiles(notification) + case let notification as Notification: + await self.withLanguageServiceAndWorkspace(for: notification, notificationHandler: self.willSaveDocument) + case let notification as Notification: + await self.withLanguageServiceAndWorkspace(for: notification, notificationHandler: self.didSaveDocument) + default: + break } - }) - - self._logRequest(request) - - switch request { - case let request as Request: - await self.initialize(request) - case let request as Request: - await self.shutdown(request) - case let request as Request: - self.workspaceSymbols(request) - case let request as Request: - self.pollIndex(request) - case let request as Request: - await self.executeCommand(request) - case let request as Request: - await self.incomingCalls(request) - case let request as Request: - await self.outgoingCalls(request) - case let request as Request: - await self.supertypes(request) - case let request as Request: - await self.subtypes(request) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.completion, fallback: CompletionList(isIncomplete: false, items: [])) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.hover, fallback: nil) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.openInterface, fallback: nil) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.declaration, fallback: nil) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.definition, fallback: .locations([])) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.references, fallback: []) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.implementation, fallback: .locations([])) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.prepareCallHierarchy, fallback: []) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.prepareTypeHierarchy, fallback: []) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.symbolInfo, fallback: []) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentSymbolHighlight, fallback: nil) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.foldingRange, fallback: nil) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentSymbol, fallback: nil) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentColor, fallback: []) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentSemanticTokens, fallback: nil) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentSemanticTokensDelta, fallback: nil) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentSemanticTokensRange, fallback: nil) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.colorPresentation, fallback: []) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.codeAction, fallback: nil) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.inlayHint, fallback: []) - case let request as Request: - await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentDiagnostic, fallback: .full(.init(items: []))) - default: - reply(.failure(ResponseError.methodNotFound(R.method))) } } - private func _logRequest(_ request: Request) { + public nonisolated func handle(_ params: R, id: RequestID, from clientID: ObjectIdentifier, reply: @escaping (LSPResult) -> Void) { + // All of the requests sourcekit-lsp do not modify global state or require + // the client to wait for the result before using the modified global state. + // For example + // - `DeclarationRequest` does not modify global state + // - `CodeCompletionRequest` modifies the state of the current code + // completion session but it only makes sense for the client to request + // more results for this completion session after it has received the + // initial results. + messageHandlingQueue.async(barrier: false) { + let cancellationToken = CancellationToken() + + let request = Request(params, id: id, clientID: clientID, cancellation: cancellationToken, reply: { [weak self] result in + reply(result) + if let self { + Task { + await self._logResponse(result, id: id, method: R.method) + } + } + }) + + self._logRequest(request) + + switch request { + case let request as Request: + await self.initialize(request) + case let request as Request: + await self.shutdown(request) + case let request as Request: + await self.workspaceSymbols(request) + case let request as Request: + await self.pollIndex(request) + case let request as Request: + await self.executeCommand(request) + case let request as Request: + await self.incomingCalls(request) + case let request as Request: + await self.outgoingCalls(request) + case let request as Request: + await self.supertypes(request) + case let request as Request: + await self.subtypes(request) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.completion, fallback: CompletionList(isIncomplete: false, items: [])) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.hover, fallback: nil) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.openInterface, fallback: nil) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.declaration, fallback: nil) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.definition, fallback: .locations([])) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.references, fallback: []) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.implementation, fallback: .locations([])) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.prepareCallHierarchy, fallback: []) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.prepareTypeHierarchy, fallback: []) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.symbolInfo, fallback: []) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentSymbolHighlight, fallback: nil) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.foldingRange, fallback: nil) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentSymbol, fallback: nil) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentColor, fallback: []) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentSemanticTokens, fallback: nil) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentSemanticTokensDelta, fallback: nil) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentSemanticTokensRange, fallback: nil) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.colorPresentation, fallback: []) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.codeAction, fallback: nil) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.inlayHint, fallback: []) + case let request as Request: + await self.withLanguageServiceAndWorkspace(for: request, requestHandler: self.documentDiagnostic, fallback: .full(.init(items: []))) + default: + reply(.failure(ResponseError.methodNotFound(R.method))) + } + } + } + + private nonisolated func _logRequest(_ request: Request) { logAsync { currentLevel in guard currentLevel >= LogLevel.debug else { return "\(type(of: self)): Request<\(R.method)(\(request.id))>"