mirror of
https://github.com/apple/sourcekit-lsp.git
synced 2026-03-02 18:23:24 +01:00
Since the connection and message handler have a reciprocal need to know about each other, move the closeHandler so that it has the opportunity to call a method on the message handler if desired.
368 lines
12 KiB
Swift
368 lines
12 KiB
Swift
//===----------------------------------------------------------------------===//
|
|
//
|
|
// This source file is part of the Swift.org open source project
|
|
//
|
|
// Copyright (c) 2014 - 2018 Apple Inc. and the Swift project authors
|
|
// Licensed under Apache License v2.0 with Runtime Library Exception
|
|
//
|
|
// See https://swift.org/LICENSE.txt for license information
|
|
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
|
|
import Dispatch
|
|
import Foundation
|
|
import LanguageServerProtocol
|
|
import LSPLogging
|
|
|
|
/// A connection between a message handler (e.g. language server) in the same process as the connection object and a remote message handler (e.g. language client) that may run in another process using JSON RPC messages sent over a pair of in/out file descriptors.
|
|
///
|
|
/// For example, inside a language server, the `JSONRPCConnection` takes the language service implemenation as its `receiveHandler` and itself provides the client connection for sending notifications and callbacks.
|
|
public final class JSONRPCConection {
|
|
|
|
var receiveHandler: MessageHandler? = nil
|
|
let queue: DispatchQueue = DispatchQueue(label: "jsonrpc-queue", qos: .userInitiated)
|
|
let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated)
|
|
let receiveIO: DispatchIO
|
|
let sendIO: DispatchIO
|
|
let messageRegistry: MessageRegistry
|
|
|
|
/// *For Testing* Whether to wait for requests to finish before handling the next message.
|
|
let syncRequests: Bool
|
|
|
|
enum State {
|
|
case created, running, closed
|
|
}
|
|
|
|
/// Current state of the connection, used to ensure correct usage.
|
|
var state: State
|
|
|
|
/// *Public for testing* Buffer of received bytes that haven't been parsed.
|
|
public var _requestBuffer: [UInt8] = []
|
|
|
|
private var _nextRequestID: Int = 0
|
|
|
|
struct OutstandingRequest {
|
|
var requestType: _RequestType.Type
|
|
var responseType: ResponseType.Type
|
|
var queue: DispatchQueue
|
|
var replyHandler: (LSPResult<Any>) -> Void
|
|
}
|
|
|
|
/// The set of currently outstanding outgoing requests along with information about how to decode and handle their responses.
|
|
var outstandingRequests: [RequestID: OutstandingRequest] = [:]
|
|
|
|
var closeHandler: (() -> Void)! = nil
|
|
|
|
public init(
|
|
protocol messageRegistry: MessageRegistry,
|
|
inFD: Int32,
|
|
outFD: Int32,
|
|
syncRequests: Bool = false)
|
|
{
|
|
state = .created
|
|
self.messageRegistry = messageRegistry
|
|
self.syncRequests = syncRequests
|
|
|
|
receiveIO = DispatchIO(type: .stream, fileDescriptor: inFD, queue: queue) { (error: Int32) in
|
|
if error != 0 {
|
|
log("IO error \(error)", level: .error)
|
|
}
|
|
}
|
|
|
|
sendIO = DispatchIO(type: .stream, fileDescriptor: outFD, queue: sendQueue) { (error: Int32) in
|
|
if error != 0 {
|
|
log("IO error \(error)", level: .error)
|
|
}
|
|
}
|
|
|
|
// We cannot assume the client will send us bytes in packets of any particular size, so set the lower limit to 1.
|
|
receiveIO.setLimit(lowWater: 1)
|
|
receiveIO.setLimit(highWater: Int.max)
|
|
|
|
sendIO.setLimit(lowWater: 1)
|
|
sendIO.setLimit(highWater: Int.max)
|
|
}
|
|
|
|
deinit {
|
|
assert(state == .closed)
|
|
}
|
|
|
|
/// Start processing `inFD` and send messages to `receiveHandler`.
|
|
///
|
|
/// - parameter receiveHandler: The message handler to invoke for requests received on the `inFD`.
|
|
public func start(receiveHandler: MessageHandler, closeHandler: @escaping () -> Void = {}) {
|
|
precondition(state == .created)
|
|
state = .running
|
|
self.receiveHandler = receiveHandler
|
|
self.closeHandler = closeHandler
|
|
|
|
receiveIO.read(offset: 0, length: Int.max, queue: queue) { done, data, errorCode in
|
|
guard errorCode == 0 else {
|
|
log("IO error \(errorCode)", level: .error)
|
|
if done { self._close() }
|
|
return
|
|
}
|
|
|
|
if done {
|
|
self._close()
|
|
return
|
|
}
|
|
|
|
guard let data = data, !data.isEmpty else {
|
|
return
|
|
}
|
|
|
|
// Parse and handle any messages in `buffer + data`, leaving any remaining unparsed bytes in `buffer`.
|
|
if self._requestBuffer.isEmpty {
|
|
data.withUnsafeBytes { (pointer: UnsafePointer<UInt8>) in
|
|
let rest = self.parseAndHandleMessages(from: UnsafeBufferPointer(start: pointer, count: data.count))
|
|
self._requestBuffer.append(contentsOf: rest)
|
|
}
|
|
} else {
|
|
self._requestBuffer.append(contentsOf: data)
|
|
var unused = 0
|
|
self._requestBuffer.withUnsafeBufferPointer { buffer in
|
|
let rest = self.parseAndHandleMessages(from: buffer)
|
|
unused = rest.count
|
|
}
|
|
self._requestBuffer.removeFirst(self._requestBuffer.count - unused)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Whether we can send messages in the current state.
|
|
///
|
|
/// - parameter shouldLog: Whether to log an info message if not ready.
|
|
func readyToSend(shouldLog: Bool = true) -> Bool {
|
|
precondition(state != .created, "tried to send message before calling start(messageHandler:)")
|
|
let ready = state == .running
|
|
if shouldLog && !ready {
|
|
log("ignoring message; state = \(state)")
|
|
}
|
|
return ready
|
|
}
|
|
|
|
/// Parse and handle all messages in `bytes`, returning a slice containing any remaining incomplete data.
|
|
func parseAndHandleMessages(from bytes: UnsafeBufferPointer<UInt8>) -> UnsafeBufferPointer<UInt8>.SubSequence {
|
|
|
|
let decoder = JSONDecoder()
|
|
|
|
// Set message registry to use for model decoding.
|
|
decoder.userInfo[.messageRegistryKey] = messageRegistry
|
|
|
|
// Setup callback for response type.
|
|
decoder.userInfo[.responseTypeCallbackKey] = { id in
|
|
guard let outstanding = self.outstandingRequests[id] else {
|
|
log("Unknown request for \(id)", level: .error)
|
|
return nil
|
|
}
|
|
return outstanding.responseType
|
|
} as JSONRPCMessage.ResponseTypeCallback
|
|
|
|
var bytes = bytes[...]
|
|
|
|
MESSAGE_LOOP: while true {
|
|
do {
|
|
guard let ((messageBytes, _), rest) = try bytes.jsonrpcSplitMessage() else {
|
|
return bytes
|
|
}
|
|
bytes = rest
|
|
|
|
let pointer = UnsafeMutableRawPointer(mutating: UnsafeBufferPointer(rebasing: messageBytes).baseAddress!)
|
|
let message = try decoder.decode(JSONRPCMessage.self, from: Data(bytesNoCopy: pointer, count: messageBytes.count, deallocator: .none))
|
|
|
|
handle(message)
|
|
|
|
} catch let error as MessageDecodingError {
|
|
|
|
switch error.messageKind {
|
|
case .request:
|
|
if let id = error.id {
|
|
send { encoder in
|
|
try encoder.encode(JSONRPCMessage.errorResponse(ResponseError(error), id: id))
|
|
}
|
|
continue MESSAGE_LOOP
|
|
}
|
|
case .response:
|
|
if let id = error.id {
|
|
if let outstanding = self.outstandingRequests.removeValue(forKey: id) {
|
|
outstanding.replyHandler(.failure(ResponseError(error)))
|
|
} else {
|
|
log("error in response to unknown request \(id) \(error)", level: .error)
|
|
}
|
|
continue MESSAGE_LOOP
|
|
}
|
|
case .notification:
|
|
if error.code == .methodNotFound {
|
|
log("ignoring unknown notification \(error)")
|
|
continue MESSAGE_LOOP
|
|
}
|
|
case .unknown:
|
|
break
|
|
}
|
|
// FIXME: graceful shutdown?
|
|
fatalError("fatal error encountered decoding message \(error)")
|
|
|
|
} catch {
|
|
// FIXME: graceful shutdown?
|
|
fatalError("fatal error encountered decoding message \(error)")
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Handle a single message by dispatching it to `receiveHandler` or an appropriate reply handler.
|
|
func handle(_ message: JSONRPCMessage) {
|
|
switch message {
|
|
case .notification(let notification):
|
|
notification._handle(receiveHandler!, connection: self)
|
|
case .request(let request, id: let id):
|
|
let semaphore: DispatchSemaphore? = syncRequests ? .init(value: 0) : nil
|
|
request._handle(receiveHandler!, id: id, connection: self) { (response, id) in
|
|
self.sendReply(response, id: id)
|
|
semaphore?.signal()
|
|
}
|
|
semaphore?.wait()
|
|
|
|
case .response(let response, id: let id):
|
|
guard let outstanding = outstandingRequests.removeValue(forKey: id) else {
|
|
log("Unknown request for \(id)", level: .error)
|
|
return
|
|
}
|
|
outstanding.replyHandler(.success(response))
|
|
case .errorResponse(let error, id: let id):
|
|
guard let outstanding = outstandingRequests.removeValue(forKey: id) else {
|
|
log("Unknown request for \(id)", level: .error)
|
|
return
|
|
}
|
|
outstanding.replyHandler(.failure(error))
|
|
}
|
|
}
|
|
|
|
/// *Public for testing*.
|
|
public func send(_rawData dispatchData: DispatchData) {
|
|
guard readyToSend() else { return }
|
|
|
|
sendIO.write(offset: 0, data: dispatchData, queue: sendQueue) { [weak self] done, _, errorCode in
|
|
if errorCode != 0 {
|
|
log("IO error sending message \(errorCode)", level: .error)
|
|
if done {
|
|
self?.queue.async {
|
|
self?._close()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func send(messageData: Data) {
|
|
|
|
var dispatchData = DispatchData.empty
|
|
let header = "Content-Length: \(messageData.count)\r\n\r\n"
|
|
header.utf8.map{$0}.withUnsafeBytes { buffer in
|
|
dispatchData.append(buffer)
|
|
}
|
|
messageData.withUnsafeBytes { rawBufferPointer in
|
|
dispatchData.append(rawBufferPointer)
|
|
}
|
|
|
|
send(_rawData: dispatchData)
|
|
}
|
|
|
|
func send(encoding: (JSONEncoder) throws -> Data) {
|
|
guard readyToSend() else { return }
|
|
|
|
let encoder = JSONEncoder()
|
|
|
|
let data: Data
|
|
do {
|
|
data = try encoding(encoder)
|
|
|
|
} catch {
|
|
// FIXME: attempt recovery?
|
|
fatalError("unexpected error while encoding response: \(error)")
|
|
}
|
|
|
|
send(messageData: data)
|
|
}
|
|
|
|
/// Close the connection.
|
|
public func close() {
|
|
queue.sync { _close() }
|
|
}
|
|
|
|
/// Close the connection. *Must be called on `queue`.*
|
|
func _close() {
|
|
sendQueue.sync {
|
|
guard state == .running else { return }
|
|
state = .closed
|
|
|
|
log("\(JSONRPCConection.self): closing...")
|
|
receiveIO.close(flags: .stop)
|
|
sendIO.close(flags: .stop)
|
|
receiveHandler = nil // break retain cycle
|
|
closeHandler()
|
|
}
|
|
}
|
|
|
|
/// Request id for the next outgoing request.
|
|
func nextRequestID() -> RequestID {
|
|
_nextRequestID += 1
|
|
return .number(_nextRequestID)
|
|
}
|
|
|
|
}
|
|
|
|
extension JSONRPCConection: Connection {
|
|
// MARK: Connection interface
|
|
|
|
public func send<Notification>(_ notification: Notification) where Notification: NotificationType {
|
|
guard readyToSend() else { return }
|
|
send { encoder in
|
|
return try encoder.encode(JSONRPCMessage.notification(notification))
|
|
}
|
|
}
|
|
|
|
public func send<Request>(_ request: Request, queue: DispatchQueue, reply: @escaping (LSPResult<Request.Response>) -> Void) -> RequestID where Request: RequestType {
|
|
|
|
let id: RequestID = self.queue.sync {
|
|
let id = nextRequestID()
|
|
|
|
guard readyToSend() else {
|
|
reply(.failure(.cancelled))
|
|
return id
|
|
}
|
|
|
|
outstandingRequests[id] = OutstandingRequest(
|
|
requestType: Request.self,
|
|
responseType: Request.Response.self,
|
|
queue: queue,
|
|
replyHandler: { anyResult in
|
|
queue.async {
|
|
reply(anyResult.map { $0 as! Request.Response })
|
|
}
|
|
})
|
|
return id
|
|
}
|
|
|
|
send { encoder in
|
|
return try encoder.encode(JSONRPCMessage.request(request, id: id))
|
|
}
|
|
|
|
return id
|
|
}
|
|
|
|
public func sendReply(_ response: LSPResult<ResponseType>, id: RequestID) {
|
|
guard readyToSend() else { return }
|
|
|
|
send { encoder in
|
|
switch response {
|
|
case .success(let result):
|
|
return try encoder.encode(JSONRPCMessage.response(result, id: id))
|
|
case .failure(let error):
|
|
return try encoder.encode(JSONRPCMessage.errorResponse(error, id: id))
|
|
}
|
|
}
|
|
}
|
|
}
|