From c254d1385b161df49944551636ec61c7936e8cda Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 28 Aug 2024 23:43:17 +0200 Subject: [PATCH 01/16] NewLambdaRuntimeClient --- .../LambdaRuntimeClient.swift | 14 +- .../NewLambdaRuntimeClient.swift | 541 ++++++++++++++++++ .../NewLambdaRuntimeClientTests.swift | 98 ++++ 3 files changed, 649 insertions(+), 4 deletions(-) create mode 100644 Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift create mode 100644 Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift index 36f88229..19b56c1f 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift @@ -142,8 +142,14 @@ extension LambdaRuntimeClient { static let defaultHeaders = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")]) /// These headers must be sent along an invocation or initialization error report - static let errorHeaders = HTTPHeaders([ - ("user-agent", "Swift-Lambda/Unknown"), - ("lambda-runtime-function-error-type", "Unhandled"), - ]) + static let errorHeaders: HTTPHeaders = [ + "user-agent": "Swift-Lambda/Unknown", + "lambda-runtime-function-error-type": "Unhandled", + ] + + /// These headers must be sent along an invocation or initialization error report + static let streamingHeaders: HTTPHeaders = [ + "user-agent": "Swift-Lambda/Unknown", + "transfer-encoding": "streaming", + ] } diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift new file mode 100644 index 00000000..ee765ef0 --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -0,0 +1,541 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore +import NIOHTTP1 +import NIOPosix +import Logging +import _NIOBase64 + +final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { + nonisolated let unownedExecutor: UnownedSerialExecutor + + struct Configuration { + var ip: String + var port: Int + } + + struct Writer: LambdaResponseStreamWriter { + private var runtimeClient: NewLambdaRuntimeClient + + fileprivate init(runtimeClient: NewLambdaRuntimeClient) { + self.runtimeClient = runtimeClient + } + + func write(_ buffer: NIOCore.ByteBuffer) async throws { + try await self.runtimeClient.write(buffer) + } + + func finish() async throws { + try await self.runtimeClient.finish() + } + + func writeAndFinish(_ buffer: NIOCore.ByteBuffer) async throws { + try await self.runtimeClient.writeAndFinish(buffer) + } + + func reportError(_ error: any Error) async throws { + try await self.runtimeClient.reportError(error) + } + } + + private enum ConnectionState { + case disconnected + case connecting([CheckedContinuation]) + case connected(Channel, LambdaChannelHandler) + } + + private let eventLoop: any EventLoop + private let logger: Logger + private let configuration: Configuration + private var connectionState: ConnectionState = .disconnected + + static func withRuntimeClient( + configuration: Configuration, + eventLoop: any EventLoop, + logger: Logger, + _ body: (NewLambdaRuntimeClient) async throws -> Result + ) async throws -> Result { + let runtime = NewLambdaRuntimeClient(configuration: configuration, eventLoop: eventLoop, logger: logger) + let result: Swift.Result + do { + result = .success(try await body(runtime)) + } catch { + result = .failure(error) + } + + //try? await runtime.close() + return try result.get() + } + + private init(configuration: Configuration, eventLoop: any EventLoop, logger: Logger) { + self.unownedExecutor = eventLoop.executor.asUnownedSerialExecutor() + self.configuration = configuration + self.eventLoop = eventLoop + self.logger = logger + } + + func nextInvocation() async throws -> (Invocation, Writer) { + let handler = try await self.makeOrGetConnection() + + return try await (handler.nextInvocation(), Writer(runtimeClient: self)) + } + + private func write(_ buffer: NIOCore.ByteBuffer) async throws { + let handler = try await self.makeOrGetConnection() + return try await handler.writeResponseBodyPart(buffer) + } + + private func finish() async throws { + let handler = try await self.makeOrGetConnection() + return try await handler.finishResponseRequest(finalData: nil) + } + + private func writeAndFinish(_ buffer: NIOCore.ByteBuffer) async throws { + let handler = try await self.makeOrGetConnection() + return try await handler.finishResponseRequest(finalData: buffer) + } + + private func reportError(_ error: any Error) async throws { + let handler = try await self.makeOrGetConnection() + return try await handler.reportError(error) + } + + private func makeOrGetConnection() async throws -> LambdaChannelHandler { + switch self.connectionState { + case .disconnected: + self.connectionState = .connecting([]) + break + case .connecting(var array): + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + array.append(continuation) + self.connectionState = .connecting(array) + } + case .connected(_, let handler): + return handler + } + + let bootstrap = ClientBootstrap(group: self.eventLoop) + .channelInitializer { channel in + do { + try channel.pipeline.syncOperations.addHTTPClientHandlers() + // Lambda quotas... An invocation payload is maximal 6MB in size: + // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html + try channel.pipeline.syncOperations.addHandler( + NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024) + ) + try channel.pipeline.syncOperations.addHandler(LambdaChannelHandler(logger: self.logger)) + return channel.eventLoop.makeSucceededFuture(()) + } catch { + return channel.eventLoop.makeFailedFuture(error) + } + } + + do { + // connect directly via socket address to avoid happy eyeballs (perf) + let address = try SocketAddress(ipAddress: self.configuration.ip, port: self.configuration.port) + let channel = try await bootstrap.connect(to: address).get() + let handler = try channel.pipeline.syncOperations.handler(type: LambdaChannelHandler.self) + + switch self.connectionState { + case .disconnected, .connected: + fatalError("Unexpected state: \(self.connectionState)") + + case .connecting(let array): + self.connectionState = .connected(channel, handler) + defer { + for continuation in array { + continuation.resume(returning: handler) + } + } + return handler + } + } catch { + switch self.connectionState { + case .disconnected, .connected: + fatalError("Unexpected state: \(self.connectionState)") + + case .connecting(let array): + self.connectionState = .disconnected + defer { + for continuation in array { + continuation.resume(throwing: error) + } + } + throw error + } + } + } +} + +// no need in locks since we validate only one request can run at a time +private final class LambdaChannelHandler { + let nextInvocationPath = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix + + enum State { + case disconnected + case connected(ChannelHandlerContext, LambdaState) + + enum LambdaState { + case idle(previousRequestID: String?) + case waitingForNextInvocation(CheckedContinuation) + case waitingForResponse(requestID: String) + case sendingResponse(requestID: String) + case sentResponse(requestID: String, CheckedContinuation) + case closing + } + } + + private var state: State = .disconnected + private var lastError: Error? + private var reusableErrorBuffer: ByteBuffer? + private let logger: Logger + + init(logger: Logger) { + self.logger = logger + } + + func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation { + switch self.state { + case .connected(let context, .idle): + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + self.state = .connected(context, .waitingForNextInvocation(continuation)) + self.sendNextRequest(context: context) + } + + case .connected(_, .closing), + .connected(_, .sendingResponse), + .connected(_, .sentResponse), + .connected(_, .waitingForNextInvocation), + .connected(_, .waitingForResponse): + fatalError() + + case .disconnected: + // TODO: throw error here + fatalError() + } + } + + func reportError(isolation: isolated (any Actor)? = #isolation, _ error: any Error) async throws { + switch self.state { + case .connected(_, .idle(.none)), + .connected(_, .waitingForNextInvocation): + fatalError("Invalid state: \(self.state)") + + case .connected(let context, .waitingForResponse(let requestID)): + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + self.state = .connected(context, .sentResponse(requestID: requestID, continuation)) + self.sendReportErrorRequest(requestID: requestID, error: error, context: context) + } + + case .connected(let context, .sendingResponse(let requestID)): + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + self.state = .connected(context, .sentResponse(requestID: requestID, continuation)) + self.sendResponseStreamingFailure(error: error, context: context) + } + + case .connected(_, .idle(previousRequestID: .some(let requestID))), + .connected(_, .sentResponse(let requestID, _)): + // The final response has already been sent. The only way to report the unhandled error + // now is to log it. Normally this library never logs higher than debug, we make an + // exception here, as there is no other way of reporting the error otherwise. + self.logger.error("Unhandled error after stream has finished", metadata: [ + "lambda_request_id": "\(requestID)", + "lambda_error": "\(String(describing: error))" + ]) + + case .disconnected, .connected(_, .closing): + // TODO: throw error here + fatalError() + } + } + + func writeResponseBodyPart(isolation: isolated (any Actor)? = #isolation, _ byteBuffer: ByteBuffer) async throws { + switch self.state { + case .connected(_, .idle(.none)), + .connected(_, .waitingForNextInvocation): + fatalError("Invalid state: \(self.state)") + + case .connected(let context, .waitingForResponse(let requestID)): + self.state = .connected(context, .sendingResponse(requestID: requestID)) + try await self.sendResponseBodyPart(byteBuffer, sendHeadWithRequestID: requestID, context: context) + + case .connected(let context, .sendingResponse): + try await self.sendResponseBodyPart(byteBuffer, sendHeadWithRequestID: nil, context: context) + + case .connected(_, .idle(previousRequestID: .some(let requestID))), + .connected(_, .sentResponse(let requestID, _)): + // TODO: throw error here – user tries to write after the stream has been finished + fatalError() + + + case .disconnected, .connected(_, .closing): + // TODO: throw error here + fatalError() + } + } + + func finishResponseRequest(isolation: isolated (any Actor)? = #isolation, finalData: ByteBuffer?) async throws { + switch self.state { + case .connected(_, .idle(.none)), + .connected(_, .waitingForNextInvocation): + fatalError("Invalid state: \(self.state)") + + case .connected(let context, .waitingForResponse(let requestID)): + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + self.state = .connected(context, .sentResponse(requestID: requestID, continuation)) + self.sendResponseFinish(finalData, sendHeadWithRequestID: requestID, context: context) + } + + case .connected(let context, .sendingResponse(let requestID)): + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + self.state = .connected(context, .sentResponse(requestID: requestID, continuation)) + self.sendResponseFinish(finalData, sendHeadWithRequestID: nil, context: context) + } + + case .connected(_, .idle(previousRequestID: .some(let requestID))), + .connected(_, .sentResponse(let requestID, _)): + // TODO: throw error here – user tries to write after the stream has been finished + fatalError() + + + case .disconnected, .connected(_, .closing): + // TODO: throw error here + fatalError() + } + } + + private func sendResponseBodyPart( + isolation: isolated (any Actor)? = #isolation, + _ byteBuffer: ByteBuffer, + sendHeadWithRequestID: String?, + context: ChannelHandlerContext + ) async throws { + + if let requestID = sendHeadWithRequestID { + // TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length + let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postResponseURLSuffix + + let httpRequest = HTTPRequestHead( + version: .http1_1, + method: .POST, + uri: url, + headers: LambdaRuntimeClient.streamingHeaders + ) + + context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) + } + + let future = context.write(self.wrapOutboundOut(.body(.byteBuffer(byteBuffer)))) + context.flush() + try await future.get() + } + + private func sendResponseFinish( + isolation: isolated (any Actor)? = #isolation, + _ byteBuffer: ByteBuffer?, + sendHeadWithRequestID: String?, + context: ChannelHandlerContext + ) { + + if let requestID = sendHeadWithRequestID { + // TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length + let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postResponseURLSuffix + + let headers: HTTPHeaders = if byteBuffer?.readableBytes ?? 0 < 6_000_000 { + [ + "user-agent": "Swift-Lambda/Unknown", + "content-length": "\(byteBuffer?.readableBytes ?? 0)", + ] + } else { + LambdaRuntimeClient.streamingHeaders + } + + let httpRequest = HTTPRequestHead( + version: .http1_1, + method: .POST, + uri: url, + headers: headers + ) + + context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) + } + + if let byteBuffer { + context.write(self.wrapOutboundOut(.body(.byteBuffer(byteBuffer))), promise: nil) + } + + context.write(self.wrapOutboundOut(.end(nil)), promise: nil) + context.flush() + } + + private func sendNextRequest(context: ChannelHandlerContext) { + let httpRequest = HTTPRequestHead(version: .http1_1, method: .GET, uri: self.nextInvocationPath, headers: LambdaRuntimeClient.defaultHeaders) + + context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) + context.write(self.wrapOutboundOut(.end(nil)), promise: nil) + context.flush() + } + + private func sendReportErrorRequest(requestID: String, error: any Error, context: ChannelHandlerContext) { + // TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length + let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postErrorURLSuffix + + let httpRequest = HTTPRequestHead( + version: .http1_1, + method: .POST, + uri: url, + headers: LambdaRuntimeClient.errorHeaders + ) + + if self.reusableErrorBuffer == nil { + self.reusableErrorBuffer = context.channel.allocator.buffer(capacity: 1024) + } else { + self.reusableErrorBuffer!.clear() + } + + let errorResponse = ErrorResponse(errorType: Consts.functionError, errorMessage: "\(error)") + // TODO: Write this directly into our ByteBuffer + let bytes = errorResponse.toJSONBytes() + self.reusableErrorBuffer!.writeBytes(bytes) + + context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) + context.write(self.wrapOutboundOut(.body(.byteBuffer(self.reusableErrorBuffer!))), promise: nil) + context.write(self.wrapOutboundOut(.end(nil)), promise: nil) + context.flush() + } + + private func sendResponseStreamingFailure(error: any Error, context: ChannelHandlerContext) { + // TODO: Use base64 here + let trailers: HTTPHeaders = [ + "Lambda-Runtime-Function-Error-Type": "Unhandled", + "Lambda-Runtime-Function-Error-Body": "Requires base64", + ] + + context.write(self.wrapOutboundOut(.end(trailers)), promise: nil) + context.flush() + } + + func cancelCurrentRequestAndCloseConnection() { + fatalError("Unimplemented") + } +} + +extension LambdaChannelHandler: ChannelInboundHandler { + typealias OutboundIn = Never + typealias InboundIn = NIOHTTPClientResponseFull + typealias OutboundOut = HTTPClientRequestPart + + func handlerAdded(context: ChannelHandlerContext) { + if context.channel.isActive { + self.state = .connected(context, .idle(previousRequestID: nil)) + } + } + + func channelActive(context: ChannelHandlerContext) { + switch self.state { + case .disconnected: + self.state = .connected(context, .idle(previousRequestID: nil)) + case .connected: + break + } + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let response = unwrapInboundIn(data) + + switch self.state { + case .connected(let context, .waitingForNextInvocation(let continuation)): + do { + let metadata = try InvocationMetadata(headers: response.head.headers) + self.state = .connected(context, .waitingForResponse(requestID: metadata.requestID)) + continuation.resume(returning: Invocation(metadata: metadata, event: response.body ?? ByteBuffer())) + } catch { + self.state = .connected(context, .closing) + fatalError("TODO: fail continuation with better error") + } + + case .connected(let context, .sentResponse(let requestID, let continuation)): + if response.head.status == .accepted { + self.state = .connected(context, .idle(previousRequestID: requestID)) + continuation.resume() + } + + case .disconnected, .connected(_, _): + break + } + +// // As defined in RFC 7230 Section 6.3: +// // HTTP/1.1 defaults to the use of "persistent connections", allowing +// // multiple requests and responses to be carried over a single +// // connection. The "close" connection option is used to signal that a +// // connection will not persist after the current request/response. HTTP +// // implementations SHOULD support persistent connections. +// // +// // That's why we only assume the connection shall be closed if we receive +// // a "connection = close" header. +// let serverCloseConnection = +// response.head.headers["connection"].contains(where: { $0.lowercased() == "close" }) +// +// let closeConnection = serverCloseConnection || response.head.version != .http1_1 +// +// if closeConnection { +// // If we were succeeding the request promise here directly and closing the connection +// // after succeeding the promise we may run into a race condition: +// // +// // The lambda runtime will ask for the next work item directly after a succeeded post +// // response request. The desire for the next work item might be faster than the attempt +// // to close the connection. This will lead to a situation where we try to the connection +// // but the next request has already been scheduled on the connection that we want to +// // close. For this reason we postpone succeeding the promise until the connection has +// // been closed. This codepath will only be hit in the very, very unlikely event of the +// // Lambda control plane demanding to close connection. (It's more or less only +// // implemented to support http1.1 correctly.) This behavior is ensured with the test +// // `LambdaTest.testNoKeepAliveServer`. +// self.state = .waitForConnectionClose(httpResponse, promise) +// _ = context.channel.close() +// return +// } else { +// self.state = .idle +// promise.succeed(httpResponse) +// } + } + + func errorCaught(context: ChannelHandlerContext, error: Error) { + // pending responses will fail with lastError in channelInactive since we are calling context.close + self.lastError = error + context.channel.close(promise: nil) + } + + func channelInactive(context: ChannelHandlerContext) { + // fail any pending responses with last error or assume peer disconnected + context.fireChannelInactive() + +// switch self.state { +// case .idle: +// break +// +// case .running(let promise, let timeout): +// self.state = .idle +// timeout?.cancel() +// promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer) +// +// case .waitForConnectionClose(let response, let promise): +// self.state = .idle +// promise.succeed(response) +// } + } +} + +private struct RequestCancelEvent {} diff --git a/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift new file mode 100644 index 00000000..734d1a9e --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift @@ -0,0 +1,98 @@ +// +// NewLambdaRuntimeClientTests.swift +// swift-aws-lambda-runtime +// +// Created by Fabian Fett on 28.08.24. +// + +import Testing +import NIOCore +import NIOPosix +import Logging +import struct Foundation.UUID +@testable import AWSLambdaRuntimeCore + +@Suite +struct NewLambdaRuntimeClientTests { + + let logger = Logger(label: "NewLambdaClientRuntimeTest") + + init() { + + } + + @Test + func testSimpleInvocations() async throws { + struct HappyBehavior: LambdaServerBehavior { + let requestId = UUID().uuidString + let event = "hello" + + func getInvocation() -> GetInvocationResult { + .success((self.requestId, self.event)) + } + + func processResponse(requestId: String, response: String?) -> Result { + #expect(self.requestId == requestId) + #expect(self.event == response) + return .success(()) + } + + func processError(requestId: String, error: ErrorResponse) -> Result { + Issue.record("should not report error") + return .failure(.internalServerError) + } + + func processInitError(error: ErrorResponse) -> Result { + Issue.record("should not report init error") + return .failure(.internalServerError) + } + } + + try await self.withMockServer(behaviour: HappyBehavior()) { mockServer, eventLoopGroup in + let configuration = NewLambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: 7000) + + try await NewLambdaRuntimeClient.withRuntimeClient( + configuration: configuration, eventLoop: eventLoopGroup.next(), + logger: self.logger + ) { runtimeClient in + do { + let (invocation, writer) = try await runtimeClient.nextInvocation() + let expected = ByteBuffer(string: "hello") + #expect(invocation.event == expected) + try await writer.writeAndFinish(expected) + } + + do { + let (invocation, writer) = try await runtimeClient.nextInvocation() + let expected = ByteBuffer(string: "hello") + #expect(invocation.event == expected) + try await writer.write(ByteBuffer(string: "h")) + try await writer.write(ByteBuffer(string: "e")) + try await writer.write(ByteBuffer(string: "l")) + try await writer.write(ByteBuffer(string: "l")) + try await writer.write(ByteBuffer(string: "o")) + try await writer.finish() + } + } + } + } + + func withMockServer(behaviour: some LambdaServerBehavior, _ body: (MockLambdaServer, MultiThreadedEventLoopGroup) async throws -> Result) async throws -> Result { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let server = MockLambdaServer(behavior: behaviour) + _ = try await server.start().get() + + let result: Swift.Result + do { + result = .success(try await body(server, eventLoopGroup)) + } catch { + result = .failure(error) + } + + try? await server.stop().get() + try? await eventLoopGroup.shutdownGracefully() + + return try result.get() + } + +} From 3831346770f0042f5b42bb70c5f7ed60b44b27df Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 28 Aug 2024 23:49:20 +0200 Subject: [PATCH 02/16] Add `NewLambdaRuntimeError` --- .../LambdaRuntimeError.swift | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 Sources/AWSLambdaRuntimeCore/LambdaRuntimeError.swift diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeError.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeError.swift new file mode 100644 index 00000000..dffc67a1 --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeError.swift @@ -0,0 +1,27 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +struct NewLambdaRuntimeError: Error { + enum Code { + case writeAfterFinishHasBeenSent + case finishAfterFinishHasBeenSent + case lostConnectionToControlPlane + case unexpectedStatusCodeForRequest + + } + + var code: Code + + +} From 02920918291ced5a557b9c2e70bfbcd0a27e9391 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 30 Aug 2024 11:52:10 +0200 Subject: [PATCH 03/16] Rename `NewLambdaRuntimeError` --- .../{LambdaRuntimeError.swift => NewLambdaRuntimeError.swift} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename Sources/AWSLambdaRuntimeCore/{LambdaRuntimeError.swift => NewLambdaRuntimeError.swift} (100%) diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeError.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift similarity index 100% rename from Sources/AWSLambdaRuntimeCore/LambdaRuntimeError.swift rename to Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift From 69251e8b6412344734726f6dc6a8189038e5667a Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 30 Aug 2024 14:01:32 +0200 Subject: [PATCH 04/16] add comment --- Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index ee765ef0..5c3a959f 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -118,6 +118,8 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { self.connectionState = .connecting([]) break case .connecting(var array): + // Since we do get sequential invocations this case normally should never be hit. + // We'll support it anyway. return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in array.append(continuation) self.connectionState = .connecting(array) From dad7333a538deda1bba58c717299bbdfdeffd272 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Fri, 30 Aug 2024 16:10:41 +0200 Subject: [PATCH 05/16] Fixing soundness --- .../NewLambdaRuntimeClient.swift | 165 +++++++++--------- .../NewLambdaRuntimeError.swift | 3 +- .../NewLambdaRuntimeClientTests.swift | 27 ++- 3 files changed, 108 insertions(+), 87 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index 5c3a959f..130507b8 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -12,10 +12,10 @@ // //===----------------------------------------------------------------------===// +import Logging import NIOCore import NIOHTTP1 import NIOPosix -import Logging import _NIOBase64 final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { @@ -36,15 +36,15 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { func write(_ buffer: NIOCore.ByteBuffer) async throws { try await self.runtimeClient.write(buffer) } - + func finish() async throws { try await self.runtimeClient.finish() } - + func writeAndFinish(_ buffer: NIOCore.ByteBuffer) async throws { try await self.runtimeClient.writeAndFinish(buffer) } - + func reportError(_ error: any Error) async throws { try await self.runtimeClient.reportError(error) } @@ -120,7 +120,8 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { case .connecting(var array): // Since we do get sequential invocations this case normally should never be hit. // We'll support it anyway. - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + return try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) in array.append(continuation) self.connectionState = .connecting(array) } @@ -211,16 +212,17 @@ private final class LambdaChannelHandler { func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation { switch self.state { case .connected(let context, .idle): - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + return try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) in self.state = .connected(context, .waitingForNextInvocation(continuation)) self.sendNextRequest(context: context) } case .connected(_, .closing), - .connected(_, .sendingResponse), - .connected(_, .sentResponse), - .connected(_, .waitingForNextInvocation), - .connected(_, .waitingForResponse): + .connected(_, .sendingResponse), + .connected(_, .sentResponse), + .connected(_, .waitingForNextInvocation), + .connected(_, .waitingForResponse): fatalError() case .disconnected: @@ -232,7 +234,7 @@ private final class LambdaChannelHandler { func reportError(isolation: isolated (any Actor)? = #isolation, _ error: any Error) async throws { switch self.state { case .connected(_, .idle(.none)), - .connected(_, .waitingForNextInvocation): + .connected(_, .waitingForNextInvocation): fatalError("Invalid state: \(self.state)") case .connected(let context, .waitingForResponse(let requestID)): @@ -248,14 +250,17 @@ private final class LambdaChannelHandler { } case .connected(_, .idle(previousRequestID: .some(let requestID))), - .connected(_, .sentResponse(let requestID, _)): + .connected(_, .sentResponse(let requestID, _)): // The final response has already been sent. The only way to report the unhandled error // now is to log it. Normally this library never logs higher than debug, we make an // exception here, as there is no other way of reporting the error otherwise. - self.logger.error("Unhandled error after stream has finished", metadata: [ - "lambda_request_id": "\(requestID)", - "lambda_error": "\(String(describing: error))" - ]) + self.logger.error( + "Unhandled error after stream has finished", + metadata: [ + "lambda_request_id": "\(requestID)", + "lambda_error": "\(String(describing: error))", + ] + ) case .disconnected, .connected(_, .closing): // TODO: throw error here @@ -266,7 +271,7 @@ private final class LambdaChannelHandler { func writeResponseBodyPart(isolation: isolated (any Actor)? = #isolation, _ byteBuffer: ByteBuffer) async throws { switch self.state { case .connected(_, .idle(.none)), - .connected(_, .waitingForNextInvocation): + .connected(_, .waitingForNextInvocation): fatalError("Invalid state: \(self.state)") case .connected(let context, .waitingForResponse(let requestID)): @@ -277,11 +282,10 @@ private final class LambdaChannelHandler { try await self.sendResponseBodyPart(byteBuffer, sendHeadWithRequestID: nil, context: context) case .connected(_, .idle(previousRequestID: .some(let requestID))), - .connected(_, .sentResponse(let requestID, _)): + .connected(_, .sentResponse(let requestID, _)): // TODO: throw error here – user tries to write after the stream has been finished fatalError() - case .disconnected, .connected(_, .closing): // TODO: throw error here fatalError() @@ -291,7 +295,7 @@ private final class LambdaChannelHandler { func finishResponseRequest(isolation: isolated (any Actor)? = #isolation, finalData: ByteBuffer?) async throws { switch self.state { case .connected(_, .idle(.none)), - .connected(_, .waitingForNextInvocation): + .connected(_, .waitingForNextInvocation): fatalError("Invalid state: \(self.state)") case .connected(let context, .waitingForResponse(let requestID)): @@ -307,11 +311,10 @@ private final class LambdaChannelHandler { } case .connected(_, .idle(previousRequestID: .some(let requestID))), - .connected(_, .sentResponse(let requestID, _)): + .connected(_, .sentResponse(let requestID, _)): // TODO: throw error here – user tries to write after the stream has been finished fatalError() - case .disconnected, .connected(_, .closing): // TODO: throw error here fatalError() @@ -355,14 +358,15 @@ private final class LambdaChannelHandler { // TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postResponseURLSuffix - let headers: HTTPHeaders = if byteBuffer?.readableBytes ?? 0 < 6_000_000 { - [ - "user-agent": "Swift-Lambda/Unknown", - "content-length": "\(byteBuffer?.readableBytes ?? 0)", - ] - } else { - LambdaRuntimeClient.streamingHeaders - } + let headers: HTTPHeaders = + if byteBuffer?.readableBytes ?? 0 < 6_000_000 { + [ + "user-agent": "Swift-Lambda/Unknown", + "content-length": "\(byteBuffer?.readableBytes ?? 0)", + ] + } else { + LambdaRuntimeClient.streamingHeaders + } let httpRequest = HTTPRequestHead( version: .http1_1, @@ -383,7 +387,12 @@ private final class LambdaChannelHandler { } private func sendNextRequest(context: ChannelHandlerContext) { - let httpRequest = HTTPRequestHead(version: .http1_1, method: .GET, uri: self.nextInvocationPath, headers: LambdaRuntimeClient.defaultHeaders) + let httpRequest = HTTPRequestHead( + version: .http1_1, + method: .GET, + uri: self.nextInvocationPath, + headers: LambdaRuntimeClient.defaultHeaders + ) context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) context.write(self.wrapOutboundOut(.end(nil)), promise: nil) @@ -478,40 +487,40 @@ extension LambdaChannelHandler: ChannelInboundHandler { break } -// // As defined in RFC 7230 Section 6.3: -// // HTTP/1.1 defaults to the use of "persistent connections", allowing -// // multiple requests and responses to be carried over a single -// // connection. The "close" connection option is used to signal that a -// // connection will not persist after the current request/response. HTTP -// // implementations SHOULD support persistent connections. -// // -// // That's why we only assume the connection shall be closed if we receive -// // a "connection = close" header. -// let serverCloseConnection = -// response.head.headers["connection"].contains(where: { $0.lowercased() == "close" }) -// -// let closeConnection = serverCloseConnection || response.head.version != .http1_1 -// -// if closeConnection { -// // If we were succeeding the request promise here directly and closing the connection -// // after succeeding the promise we may run into a race condition: -// // -// // The lambda runtime will ask for the next work item directly after a succeeded post -// // response request. The desire for the next work item might be faster than the attempt -// // to close the connection. This will lead to a situation where we try to the connection -// // but the next request has already been scheduled on the connection that we want to -// // close. For this reason we postpone succeeding the promise until the connection has -// // been closed. This codepath will only be hit in the very, very unlikely event of the -// // Lambda control plane demanding to close connection. (It's more or less only -// // implemented to support http1.1 correctly.) This behavior is ensured with the test -// // `LambdaTest.testNoKeepAliveServer`. -// self.state = .waitForConnectionClose(httpResponse, promise) -// _ = context.channel.close() -// return -// } else { -// self.state = .idle -// promise.succeed(httpResponse) -// } + // // As defined in RFC 7230 Section 6.3: + // // HTTP/1.1 defaults to the use of "persistent connections", allowing + // // multiple requests and responses to be carried over a single + // // connection. The "close" connection option is used to signal that a + // // connection will not persist after the current request/response. HTTP + // // implementations SHOULD support persistent connections. + // // + // // That's why we only assume the connection shall be closed if we receive + // // a "connection = close" header. + // let serverCloseConnection = + // response.head.headers["connection"].contains(where: { $0.lowercased() == "close" }) + // + // let closeConnection = serverCloseConnection || response.head.version != .http1_1 + // + // if closeConnection { + // // If we were succeeding the request promise here directly and closing the connection + // // after succeeding the promise we may run into a race condition: + // // + // // The lambda runtime will ask for the next work item directly after a succeeded post + // // response request. The desire for the next work item might be faster than the attempt + // // to close the connection. This will lead to a situation where we try to the connection + // // but the next request has already been scheduled on the connection that we want to + // // close. For this reason we postpone succeeding the promise until the connection has + // // been closed. This codepath will only be hit in the very, very unlikely event of the + // // Lambda control plane demanding to close connection. (It's more or less only + // // implemented to support http1.1 correctly.) This behavior is ensured with the test + // // `LambdaTest.testNoKeepAliveServer`. + // self.state = .waitForConnectionClose(httpResponse, promise) + // _ = context.channel.close() + // return + // } else { + // self.state = .idle + // promise.succeed(httpResponse) + // } } func errorCaught(context: ChannelHandlerContext, error: Error) { @@ -524,19 +533,19 @@ extension LambdaChannelHandler: ChannelInboundHandler { // fail any pending responses with last error or assume peer disconnected context.fireChannelInactive() -// switch self.state { -// case .idle: -// break -// -// case .running(let promise, let timeout): -// self.state = .idle -// timeout?.cancel() -// promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer) -// -// case .waitForConnectionClose(let response, let promise): -// self.state = .idle -// promise.succeed(response) -// } + // switch self.state { + // case .idle: + // break + // + // case .running(let promise, let timeout): + // self.state = .idle + // timeout?.cancel() + // promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer) + // + // case .waitForConnectionClose(let response, let promise): + // self.state = .idle + // promise.succeed(response) + // } } } diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift index dffc67a1..88c33393 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift @@ -18,10 +18,9 @@ struct NewLambdaRuntimeError: Error { case finishAfterFinishHasBeenSent case lostConnectionToControlPlane case unexpectedStatusCodeForRequest - + } var code: Code - } diff --git a/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift index 734d1a9e..31cbeec8 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift @@ -1,15 +1,24 @@ +//===----------------------------------------------------------------------===// // -// NewLambdaRuntimeClientTests.swift -// swift-aws-lambda-runtime +// This source file is part of the SwiftAWSLambdaRuntime open source project // -// Created by Fabian Fett on 28.08.24. +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 // +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// -import Testing +import Logging import NIOCore import NIOPosix -import Logging +import Testing + import struct Foundation.UUID + @testable import AWSLambdaRuntimeCore @Suite @@ -52,7 +61,8 @@ struct NewLambdaRuntimeClientTests { let configuration = NewLambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: 7000) try await NewLambdaRuntimeClient.withRuntimeClient( - configuration: configuration, eventLoop: eventLoopGroup.next(), + configuration: configuration, + eventLoop: eventLoopGroup.next(), logger: self.logger ) { runtimeClient in do { @@ -77,7 +87,10 @@ struct NewLambdaRuntimeClientTests { } } - func withMockServer(behaviour: some LambdaServerBehavior, _ body: (MockLambdaServer, MultiThreadedEventLoopGroup) async throws -> Result) async throws -> Result { + func withMockServer( + behaviour: some LambdaServerBehavior, + _ body: (MockLambdaServer, MultiThreadedEventLoopGroup) async throws -> Result + ) async throws -> Result { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) let server = MockLambdaServer(behavior: behaviour) _ = try await server.start().get() From a1dddbe0b7051412cc69fce9154e2daf701052c6 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 3 Sep 2024 17:16:46 +0200 Subject: [PATCH 06/16] Progress --- .../NewLambdaRuntimeClient.swift | 265 ++++++++++++------ .../NewLambdaRuntimeError.swift | 5 + .../AWSLambdaRuntimeCoreTests/ActorTest.swift | 43 +++ .../LambdaTest.swift | 4 +- .../MockLambdaServer.swift | 2 +- Tests/AWSLambdaRuntimeCoreTests/Utils.swift | 4 +- 6 files changed, 240 insertions(+), 83 deletions(-) create mode 100644 Tests/AWSLambdaRuntimeCoreTests/ActorTest.swift diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index 130507b8..fb90ece8 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -38,7 +38,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { } func finish() async throws { - try await self.runtimeClient.finish() + try await self.runtimeClient.writeAndFinish(nil) } func writeAndFinish(_ buffer: NIOCore.ByteBuffer) async throws { @@ -52,14 +52,28 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { private enum ConnectionState { case disconnected - case connecting([CheckedContinuation]) - case connected(Channel, LambdaChannelHandler) + case connecting([CheckedContinuation, any Error>]) + case connected(Channel, LambdaChannelHandler) + } + + enum LambdaState { + /// this is the "normal" state. Transitions to `waitingForNextInvocation` + case idle(previousRequestID: String?) + /// this is the state while we wait for an invocation. A next call is running. + /// Transitions to `waitingForResponse` + case waitingForNextInvocation + /// The invocation was forwarded to the handler and we wait for a response. + /// Transitions to `sendingResponse` or `sentResponse`. + case waitingForResponse(requestID: String) + case sendingResponse(requestID: String) + case sentResponse(requestID: String) } private let eventLoop: any EventLoop private let logger: Logger private let configuration: Configuration private var connectionState: ConnectionState = .disconnected + private var lambdaState: LambdaState = .idle(previousRequestID: nil) static func withRuntimeClient( configuration: Configuration, @@ -87,32 +101,99 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { } func nextInvocation() async throws -> (Invocation, Writer) { - let handler = try await self.makeOrGetConnection() + switch self.lambdaState { + case .idle: + self.lambdaState = .waitingForNextInvocation + let handler = try await self.makeOrGetConnection() + let invocation = try await handler.nextInvocation() + guard case .waitingForNextInvocation = self.lambdaState else { + fatalError("Invalid state: \(self.lambdaState)") + } + self.lambdaState = .waitingForResponse(requestID: invocation.metadata.requestID) + return (invocation, Writer(runtimeClient: self)) + + case .waitingForNextInvocation, + .waitingForResponse, + .sendingResponse, + .sentResponse: + fatalError("Invalid state: \(self.lambdaState)") + } - return try await (handler.nextInvocation(), Writer(runtimeClient: self)) } private func write(_ buffer: NIOCore.ByteBuffer) async throws { - let handler = try await self.makeOrGetConnection() - return try await handler.writeResponseBodyPart(buffer) - } + switch self.lambdaState { + case .idle, .sentResponse: + throw NewLambdaRuntimeError(code: .writeAfterFinishHasBeenSent) + + case .waitingForNextInvocation: + fatalError("Invalid state: \(self.lambdaState)") - private func finish() async throws { - let handler = try await self.makeOrGetConnection() - return try await handler.finishResponseRequest(finalData: nil) + case .waitingForResponse(let requestID): + self.lambdaState = .sendingResponse(requestID: requestID) + fallthrough + + case .sendingResponse(let requestID): + let handler = try await self.makeOrGetConnection() + guard case .sendingResponse(requestID) = self.lambdaState else { + fatalError("Invalid state: \(self.lambdaState)") + } + return try await handler.writeResponseBodyPart(buffer, requestID: requestID) + } } - private func writeAndFinish(_ buffer: NIOCore.ByteBuffer) async throws { - let handler = try await self.makeOrGetConnection() - return try await handler.finishResponseRequest(finalData: buffer) + private func writeAndFinish(_ buffer: NIOCore.ByteBuffer?) async throws { + switch self.lambdaState { + case .idle, .sentResponse: + throw NewLambdaRuntimeError(code: .finishAfterFinishHasBeenSent) + + case .waitingForNextInvocation: + fatalError("Invalid state: \(self.lambdaState)") + + case .waitingForResponse(let requestID): + fallthrough + + case .sendingResponse(let requestID): + self.lambdaState = .sentResponse(requestID: requestID) + let handler = try await self.makeOrGetConnection() + guard case .sentResponse(requestID) = self.lambdaState else { + fatalError("Invalid state: \(self.lambdaState)") + } + try await handler.finishResponseRequest(finalData: buffer, requestID: requestID) + guard case .sentResponse(requestID) = self.lambdaState else { + fatalError("Invalid state: \(self.lambdaState)") + } + self.lambdaState = .idle(previousRequestID: requestID) + } } private func reportError(_ error: any Error) async throws { - let handler = try await self.makeOrGetConnection() - return try await handler.reportError(error) + switch self.lambdaState { + case .idle, .waitingForNextInvocation, .sentResponse: + fatalError("Invalid state: \(self.lambdaState)") + + case .waitingForResponse(let requestID): + fallthrough + + case .sendingResponse(let requestID): + self.lambdaState = .sentResponse(requestID: requestID) + let handler = try await self.makeOrGetConnection() + guard case .sentResponse(requestID) = self.lambdaState else { + fatalError("Invalid state: \(self.lambdaState)") + } + try await handler.reportError(error, requestID: requestID) + guard case .sentResponse(requestID) = self.lambdaState else { + fatalError("Invalid state: \(self.lambdaState)") + } + self.lambdaState = .idle(previousRequestID: requestID) + } } - private func makeOrGetConnection() async throws -> LambdaChannelHandler { + private func channelClosed(_ channel: any Channel) { + // TODO: Fill out + } + + private func makeOrGetConnection() async throws -> LambdaChannelHandler { switch self.connectionState { case .disconnected: self.connectionState = .connecting([]) @@ -121,7 +202,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { // Since we do get sequential invocations this case normally should never be hit. // We'll support it anyway. return try await withCheckedThrowingContinuation { - (continuation: CheckedContinuation) in + (continuation: CheckedContinuation, any Error>) in array.append(continuation) self.connectionState = .connecting(array) } @@ -138,7 +219,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { try channel.pipeline.syncOperations.addHandler( NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024) ) - try channel.pipeline.syncOperations.addHandler(LambdaChannelHandler(logger: self.logger)) + try channel.pipeline.syncOperations.addHandler(LambdaChannelHandler(delegate: self, logger: self.logger)) return channel.eventLoop.makeSucceededFuture(()) } catch { return channel.eventLoop.makeFailedFuture(error) @@ -149,7 +230,13 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { // connect directly via socket address to avoid happy eyeballs (perf) let address = try SocketAddress(ipAddress: self.configuration.ip, port: self.configuration.port) let channel = try await bootstrap.connect(to: address).get() - let handler = try channel.pipeline.syncOperations.handler(type: LambdaChannelHandler.self) + let handler = try channel.pipeline.syncOperations.handler(type: LambdaChannelHandler.self) + channel.closeFuture.whenComplete { result in + self.eventLoop.preconditionInEventLoop() + self.assumeIsolated { runtimeClient in + runtimeClient.channelClosed(channel) + } + } switch self.connectionState { case .disconnected, .connected: @@ -182,8 +269,22 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { } } -// no need in locks since we validate only one request can run at a time -private final class LambdaChannelHandler { +extension NewLambdaRuntimeClient: LambdaChannelHandlerDelegate { + nonisolated func connectionErrorHappened(_ error: any Error, channel: any Channel) { + + } + + nonisolated func connectionWillClose(channel: any Channel) { + + } +} + +private protocol LambdaChannelHandlerDelegate { + func connectionWillClose(channel: any Channel) + func connectionErrorHappened(_ error: any Error, channel: any Channel) +} + +private final class LambdaChannelHandler { let nextInvocationPath = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix enum State { @@ -191,11 +292,16 @@ private final class LambdaChannelHandler { case connected(ChannelHandlerContext, LambdaState) enum LambdaState { - case idle(previousRequestID: String?) + /// this is the "normal" state. Transitions to `waitingForNextInvocation` + case idle + /// this is the state while we wait for an invocation. A next call is running. + /// Transitions to `waitingForResponse` case waitingForNextInvocation(CheckedContinuation) - case waitingForResponse(requestID: String) - case sendingResponse(requestID: String) - case sentResponse(requestID: String, CheckedContinuation) + /// The invocation was forwarded to the handler and we wait for a response. + /// Transitions to `sendingResponse` or `sentResponse`. + case waitingForResponse + case sendingResponse + case sentResponse(CheckedContinuation) case closing } } @@ -204,8 +310,10 @@ private final class LambdaChannelHandler { private var lastError: Error? private var reusableErrorBuffer: ByteBuffer? private let logger: Logger + private let delegate: Delegate - init(logger: Logger) { + init(delegate: Delegate, logger: Logger) { + self.delegate = delegate self.logger = logger } @@ -223,34 +331,32 @@ private final class LambdaChannelHandler { .connected(_, .sentResponse), .connected(_, .waitingForNextInvocation), .connected(_, .waitingForResponse): - fatalError() + fatalError("Invalid state: \(self.state)") case .disconnected: - // TODO: throw error here - fatalError() + throw NewLambdaRuntimeError(code: .connectionToControlPlaneLost) } } - func reportError(isolation: isolated (any Actor)? = #isolation, _ error: any Error) async throws { + func reportError(isolation: isolated (any Actor)? = #isolation, _ error: any Error, requestID: String) async throws { switch self.state { - case .connected(_, .idle(.none)), - .connected(_, .waitingForNextInvocation): + case .connected(_, .waitingForNextInvocation): fatalError("Invalid state: \(self.state)") - case .connected(let context, .waitingForResponse(let requestID)): + case .connected(let context, .waitingForResponse): try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self.state = .connected(context, .sentResponse(requestID: requestID, continuation)) + self.state = .connected(context, .sentResponse(continuation)) self.sendReportErrorRequest(requestID: requestID, error: error, context: context) } - case .connected(let context, .sendingResponse(let requestID)): + case .connected(let context, .sendingResponse): try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self.state = .connected(context, .sentResponse(requestID: requestID, continuation)) + self.state = .connected(context, .sentResponse(continuation)) self.sendResponseStreamingFailure(error: error, context: context) } - case .connected(_, .idle(previousRequestID: .some(let requestID))), - .connected(_, .sentResponse(let requestID, _)): + case .connected(_, .idle), + .connected(_, .sentResponse): // The final response has already been sent. The only way to report the unhandled error // now is to log it. Normally this library never logs higher than debug, we make an // exception here, as there is no other way of reporting the error otherwise. @@ -262,62 +368,64 @@ private final class LambdaChannelHandler { ] ) - case .disconnected, .connected(_, .closing): - // TODO: throw error here - fatalError() + case .disconnected: + throw NewLambdaRuntimeError(code: .connectionToControlPlaneLost) + + case .connected(_, .closing): + throw NewLambdaRuntimeError(code: .connectionToControlPlaneGoingAway) } } - func writeResponseBodyPart(isolation: isolated (any Actor)? = #isolation, _ byteBuffer: ByteBuffer) async throws { + func writeResponseBodyPart(isolation: isolated (any Actor)? = #isolation, _ byteBuffer: ByteBuffer, requestID: String) async throws { switch self.state { - case .connected(_, .idle(.none)), - .connected(_, .waitingForNextInvocation): + case .connected(_, .waitingForNextInvocation): fatalError("Invalid state: \(self.state)") - case .connected(let context, .waitingForResponse(let requestID)): - self.state = .connected(context, .sendingResponse(requestID: requestID)) + case .connected(let context, .waitingForResponse): + self.state = .connected(context, .sendingResponse) try await self.sendResponseBodyPart(byteBuffer, sendHeadWithRequestID: requestID, context: context) case .connected(let context, .sendingResponse): try await self.sendResponseBodyPart(byteBuffer, sendHeadWithRequestID: nil, context: context) - case .connected(_, .idle(previousRequestID: .some(let requestID))), - .connected(_, .sentResponse(let requestID, _)): - // TODO: throw error here – user tries to write after the stream has been finished - fatalError() + case .connected(_, .idle), + .connected(_, .sentResponse): + throw NewLambdaRuntimeError(code: .writeAfterFinishHasBeenSent) - case .disconnected, .connected(_, .closing): - // TODO: throw error here - fatalError() + case .disconnected: + throw NewLambdaRuntimeError(code: .connectionToControlPlaneLost) + + case .connected(_, .closing): + throw NewLambdaRuntimeError(code: .connectionToControlPlaneGoingAway) } } - func finishResponseRequest(isolation: isolated (any Actor)? = #isolation, finalData: ByteBuffer?) async throws { + func finishResponseRequest(isolation: isolated (any Actor)? = #isolation, finalData: ByteBuffer?, requestID: String) async throws { switch self.state { - case .connected(_, .idle(.none)), + case .connected(_, .idle), .connected(_, .waitingForNextInvocation): fatalError("Invalid state: \(self.state)") - case .connected(let context, .waitingForResponse(let requestID)): + case .connected(let context, .waitingForResponse): try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self.state = .connected(context, .sentResponse(requestID: requestID, continuation)) + self.state = .connected(context, .sentResponse(continuation)) self.sendResponseFinish(finalData, sendHeadWithRequestID: requestID, context: context) } - case .connected(let context, .sendingResponse(let requestID)): + case .connected(let context, .sendingResponse): try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self.state = .connected(context, .sentResponse(requestID: requestID, continuation)) + self.state = .connected(context, .sentResponse(continuation)) self.sendResponseFinish(finalData, sendHeadWithRequestID: nil, context: context) } - case .connected(_, .idle(previousRequestID: .some(let requestID))), - .connected(_, .sentResponse(let requestID, _)): - // TODO: throw error here – user tries to write after the stream has been finished - fatalError() + case .connected(_, .sentResponse): + throw NewLambdaRuntimeError(code: .finishAfterFinishHasBeenSent) - case .disconnected, .connected(_, .closing): - // TODO: throw error here - fatalError() + case .disconnected: + throw NewLambdaRuntimeError(code: .connectionToControlPlaneLost) + + case .connected(_, .closing): + throw NewLambdaRuntimeError(code: .connectionToControlPlaneGoingAway) } } @@ -353,11 +461,12 @@ private final class LambdaChannelHandler { sendHeadWithRequestID: String?, context: ChannelHandlerContext ) { - if let requestID = sendHeadWithRequestID { - // TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length - let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postResponseURLSuffix + // TODO: This feels quite expensive. We should be able to make this cheaper. requestIDs are fixed length + let url = "\(Consts.invocationURLPrefix)/\(requestID)/\(Consts.postResponseURLSuffix)" + // If we have less than 6MB, we don't want to use the streaming API. If we have more + // than 6MB we must use the streaming mode. let headers: HTTPHeaders = if byteBuffer?.readableBytes ?? 0 < 6_000_000 { [ @@ -400,8 +509,8 @@ private final class LambdaChannelHandler { } private func sendReportErrorRequest(requestID: String, error: any Error, context: ChannelHandlerContext) { - // TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length - let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postErrorURLSuffix + // TODO: This feels quite expensive. We should be able to make this cheaper. requestIDs are fixed length + let url = "\(Consts.invocationURLPrefix)/\(requestID)/\(Consts.postErrorURLSuffix)" let httpRequest = HTTPRequestHead( version: .http1_1, @@ -450,14 +559,14 @@ extension LambdaChannelHandler: ChannelInboundHandler { func handlerAdded(context: ChannelHandlerContext) { if context.channel.isActive { - self.state = .connected(context, .idle(previousRequestID: nil)) + self.state = .connected(context, .idle) } } func channelActive(context: ChannelHandlerContext) { switch self.state { case .disconnected: - self.state = .connected(context, .idle(previousRequestID: nil)) + self.state = .connected(context, .idle) case .connected: break } @@ -470,16 +579,16 @@ extension LambdaChannelHandler: ChannelInboundHandler { case .connected(let context, .waitingForNextInvocation(let continuation)): do { let metadata = try InvocationMetadata(headers: response.head.headers) - self.state = .connected(context, .waitingForResponse(requestID: metadata.requestID)) + self.state = .connected(context, .waitingForResponse) continuation.resume(returning: Invocation(metadata: metadata, event: response.body ?? ByteBuffer())) } catch { self.state = .connected(context, .closing) - fatalError("TODO: fail continuation with better error") + continuation.resume(throwing: NewLambdaRuntimeError(code: .invocationMissingMetadata, underlying: error)) } - case .connected(let context, .sentResponse(let requestID, let continuation)): + case .connected(let context, .sentResponse(let continuation)): if response.head.status == .accepted { - self.state = .connected(context, .idle(previousRequestID: requestID)) + self.state = .connected(context, .idle) continuation.resume() } diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift index 88c33393..844f54d3 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift @@ -14,6 +14,10 @@ struct NewLambdaRuntimeError: Error { enum Code { + case connectionToControlPlaneLost + case connectionToControlPlaneGoingAway + case invocationMissingMetadata + case writeAfterFinishHasBeenSent case finishAfterFinishHasBeenSent case lostConnectionToControlPlane @@ -22,5 +26,6 @@ struct NewLambdaRuntimeError: Error { } var code: Code + var underlying: (any Error)? } diff --git a/Tests/AWSLambdaRuntimeCoreTests/ActorTest.swift b/Tests/AWSLambdaRuntimeCoreTests/ActorTest.swift new file mode 100644 index 00000000..29aab568 --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/ActorTest.swift @@ -0,0 +1,43 @@ +// +// ActorTest.swift +// swift-aws-lambda-runtime +// +// Created by Fabian Fett on 03.09.24. +// + +import Testing +import NIOCore +import NIOPosix + +final actor EventLoopActor { + nonisolated let unownedExecutor: UnownedSerialExecutor + private let eventLoop: any EventLoop + + init(eventLoop: any EventLoop) { + self.unownedExecutor = eventLoop.executor.asUnownedSerialExecutor() + self.eventLoop = eventLoop + } + + func haha() { + _ = self.eventLoop.execute { + self.assumeIsolated { this in // crashes here! + print(this.eventLoop) + } + } + } +} + +@Suite +struct ActorTest { + + @Test + func testEventLoopAsCustomSerialExecutor() async { + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let actor = EventLoopActor(eventLoop: elg.next()) + + await actor.haha() + + try? await elg.shutdownGracefully() + } +} + diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift index 9379f5ee..3eb15fab 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift @@ -222,7 +222,7 @@ class LambdaTest: XCTestCase { cognitoIdentity: nil, clientContext: nil, logger: Logger(label: "test"), - eventLoop: MultiThreadedEventLoopGroup(numberOfThreads: 1).next(), + eventLoop: NIOSingletons.posixEventLoopGroup.next(), allocator: ByteBufferAllocator() ) XCTAssertGreaterThan(context.deadline, .now()) @@ -250,7 +250,7 @@ class LambdaTest: XCTestCase { cognitoIdentity: nil, clientContext: nil, logger: Logger(label: "test"), - eventLoop: MultiThreadedEventLoopGroup(numberOfThreads: 1).next(), + eventLoop: NIOSingletons.posixEventLoopGroup.next(), allocator: ByteBufferAllocator() ) XCTAssertLessThanOrEqual(context.getRemainingTime(), .seconds(1)) diff --git a/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift b/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift index a0859218..1d56da69 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift @@ -53,7 +53,7 @@ final class MockLambdaServer { private var shutdown = false init(behavior: LambdaServerBehavior, host: String = "127.0.0.1", port: Int = 7000, keepAlive: Bool = true) { - self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + self.group = NIOSingletons.posixEventLoopGroup self.behavior = behavior self.host = host self.port = port diff --git a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift index 8bbd4730..ef6b78a8 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift @@ -62,7 +62,7 @@ func runLambda( behavior: LambdaServerBehavior, handlerProvider: @escaping (LambdaInitializationContext) async throws -> Handler ) throws { - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let eventLoopGroup = NIOSingletons.posixEventLoopGroup.next() try runLambda( behavior: behavior, handlerProvider: { context in @@ -79,7 +79,7 @@ func runLambda( behavior: LambdaServerBehavior, handlerProvider: @escaping (LambdaInitializationContext) -> EventLoopFuture ) throws { - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let eventLoopGroup = NIOSingletons.posixEventLoopGroup.next() defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } let logger = Logger(label: "TestLogger") let server = MockLambdaServer(behavior: behavior, port: 0) From 1b721c19928c0c0c082691243e56eddc630cb755 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 3 Sep 2024 20:17:06 +0200 Subject: [PATCH 07/16] Update to latest NIO --- Package.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Package.swift b/Package.swift index 63d83b7b..25cbdd4f 100644 --- a/Package.swift +++ b/Package.swift @@ -23,7 +23,7 @@ let package = Package( .library(name: "AWSLambdaTesting", targets: ["AWSLambdaTesting"]), ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", .upToNextMajor(from: "2.67.0")), + .package(url: "https://github.com/apple/swift-nio.git", .upToNextMajor(from: "2.72.0")), .package(url: "https://github.com/apple/swift-log.git", .upToNextMajor(from: "1.5.4")), .package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-testing.git", branch: "swift-DEVELOPMENT-SNAPSHOT-2024-08-29-a"), From e5c0e29d0366886fe17d500850e6a31914a90b29 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 3 Sep 2024 20:17:48 +0200 Subject: [PATCH 08/16] Delete now unused test --- .../AWSLambdaRuntimeCoreTests/ActorTest.swift | 43 ------------------- 1 file changed, 43 deletions(-) delete mode 100644 Tests/AWSLambdaRuntimeCoreTests/ActorTest.swift diff --git a/Tests/AWSLambdaRuntimeCoreTests/ActorTest.swift b/Tests/AWSLambdaRuntimeCoreTests/ActorTest.swift deleted file mode 100644 index 29aab568..00000000 --- a/Tests/AWSLambdaRuntimeCoreTests/ActorTest.swift +++ /dev/null @@ -1,43 +0,0 @@ -// -// ActorTest.swift -// swift-aws-lambda-runtime -// -// Created by Fabian Fett on 03.09.24. -// - -import Testing -import NIOCore -import NIOPosix - -final actor EventLoopActor { - nonisolated let unownedExecutor: UnownedSerialExecutor - private let eventLoop: any EventLoop - - init(eventLoop: any EventLoop) { - self.unownedExecutor = eventLoop.executor.asUnownedSerialExecutor() - self.eventLoop = eventLoop - } - - func haha() { - _ = self.eventLoop.execute { - self.assumeIsolated { this in // crashes here! - print(this.eventLoop) - } - } - } -} - -@Suite -struct ActorTest { - - @Test - func testEventLoopAsCustomSerialExecutor() async { - let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) - let actor = EventLoopActor(eventLoop: elg.next()) - - await actor.haha() - - try? await elg.shutdownGracefully() - } -} - From 13a1d7b56dbbd33cee2d182e946ed99de957e22f Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 3 Sep 2024 20:21:16 +0200 Subject: [PATCH 09/16] Tests work again --- Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index fb90ece8..4bddfdb1 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -463,7 +463,7 @@ private final class LambdaChannelHandler { ) { if let requestID = sendHeadWithRequestID { // TODO: This feels quite expensive. We should be able to make this cheaper. requestIDs are fixed length - let url = "\(Consts.invocationURLPrefix)/\(requestID)/\(Consts.postResponseURLSuffix)" + let url = "\(Consts.invocationURLPrefix)/\(requestID)\(Consts.postResponseURLSuffix)" // If we have less than 6MB, we don't want to use the streaming API. If we have more // than 6MB we must use the streaming mode. @@ -510,7 +510,7 @@ private final class LambdaChannelHandler { private func sendReportErrorRequest(requestID: String, error: any Error, context: ChannelHandlerContext) { // TODO: This feels quite expensive. We should be able to make this cheaper. requestIDs are fixed length - let url = "\(Consts.invocationURLPrefix)/\(requestID)/\(Consts.postErrorURLSuffix)" + let url = "\(Consts.invocationURLPrefix)/\(requestID)\(Consts.postErrorURLSuffix)" let httpRequest = HTTPRequestHead( version: .http1_1, From e6a5f72fee91f864b77f750aac9c46d3f5b915b7 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 3 Sep 2024 20:35:00 +0200 Subject: [PATCH 10/16] Fix tests --- Tests/AWSLambdaRuntimeCoreTests/Utils.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift index ef6b78a8..41a2552f 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/Utils.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/Utils.swift @@ -80,7 +80,6 @@ func runLambda( handlerProvider: @escaping (LambdaInitializationContext) -> EventLoopFuture ) throws { let eventLoopGroup = NIOSingletons.posixEventLoopGroup.next() - defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } let logger = Logger(label: "TestLogger") let server = MockLambdaServer(behavior: behavior, port: 0) let port = try server.start().wait() From 4c25eb7dfeaacc9fe16df36115064f73a08edc2c Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Tue, 3 Sep 2024 20:38:16 +0200 Subject: [PATCH 11/16] swift format --- .../NewLambdaRuntimeClient.swift | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index 4bddfdb1..eaaf64d2 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -113,9 +113,9 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { return (invocation, Writer(runtimeClient: self)) case .waitingForNextInvocation, - .waitingForResponse, - .sendingResponse, - .sentResponse: + .waitingForResponse, + .sendingResponse, + .sentResponse: fatalError("Invalid state: \(self.lambdaState)") } @@ -219,7 +219,9 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { try channel.pipeline.syncOperations.addHandler( NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024) ) - try channel.pipeline.syncOperations.addHandler(LambdaChannelHandler(delegate: self, logger: self.logger)) + try channel.pipeline.syncOperations.addHandler( + LambdaChannelHandler(delegate: self, logger: self.logger) + ) return channel.eventLoop.makeSucceededFuture(()) } catch { return channel.eventLoop.makeFailedFuture(error) @@ -230,7 +232,9 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { // connect directly via socket address to avoid happy eyeballs (perf) let address = try SocketAddress(ipAddress: self.configuration.ip, port: self.configuration.port) let channel = try await bootstrap.connect(to: address).get() - let handler = try channel.pipeline.syncOperations.handler(type: LambdaChannelHandler.self) + let handler = try channel.pipeline.syncOperations.handler( + type: LambdaChannelHandler.self + ) channel.closeFuture.whenComplete { result in self.eventLoop.preconditionInEventLoop() self.assumeIsolated { runtimeClient in @@ -338,7 +342,11 @@ private final class LambdaChannelHandler { } } - func reportError(isolation: isolated (any Actor)? = #isolation, _ error: any Error, requestID: String) async throws { + func reportError( + isolation: isolated (any Actor)? = #isolation, + _ error: any Error, + requestID: String + ) async throws { switch self.state { case .connected(_, .waitingForNextInvocation): fatalError("Invalid state: \(self.state)") @@ -376,7 +384,11 @@ private final class LambdaChannelHandler { } } - func writeResponseBodyPart(isolation: isolated (any Actor)? = #isolation, _ byteBuffer: ByteBuffer, requestID: String) async throws { + func writeResponseBodyPart( + isolation: isolated (any Actor)? = #isolation, + _ byteBuffer: ByteBuffer, + requestID: String + ) async throws { switch self.state { case .connected(_, .waitingForNextInvocation): fatalError("Invalid state: \(self.state)") @@ -400,7 +412,11 @@ private final class LambdaChannelHandler { } } - func finishResponseRequest(isolation: isolated (any Actor)? = #isolation, finalData: ByteBuffer?, requestID: String) async throws { + func finishResponseRequest( + isolation: isolated (any Actor)? = #isolation, + finalData: ByteBuffer?, + requestID: String + ) async throws { switch self.state { case .connected(_, .idle), .connected(_, .waitingForNextInvocation): @@ -583,7 +599,9 @@ extension LambdaChannelHandler: ChannelInboundHandler { continuation.resume(returning: Invocation(metadata: metadata, event: response.body ?? ByteBuffer())) } catch { self.state = .connected(context, .closing) - continuation.resume(throwing: NewLambdaRuntimeError(code: .invocationMissingMetadata, underlying: error)) + continuation.resume( + throwing: NewLambdaRuntimeError(code: .invocationMissingMetadata, underlying: error) + ) } case .connected(let context, .sentResponse(let continuation)): From f20674d0368c365d06a3f96594b00729130d3dbe Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 4 Sep 2024 13:47:05 +0200 Subject: [PATCH 12/16] Progress on observing connection close --- .../NewLambdaRuntimeClient.swift | 105 +++++++++++++++--- .../NewLambdaRuntimeError.swift | 2 + 2 files changed, 90 insertions(+), 17 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index eaaf64d2..a41f44e8 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -16,7 +16,6 @@ import Logging import NIOCore import NIOHTTP1 import NIOPosix -import _NIOBase64 final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { nonisolated let unownedExecutor: UnownedSerialExecutor @@ -69,11 +68,22 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { case sentResponse(requestID: String) } + enum ClosingState { + case notClosing + case closing(CheckedContinuation) + } + private let eventLoop: any EventLoop private let logger: Logger private let configuration: Configuration + private var connectionState: ConnectionState = .disconnected private var lambdaState: LambdaState = .idle(previousRequestID: nil) + private var closingState: ClosingState = .notClosing + + // connections that are currently being closed. In the `run` method we must await all of them + // being fully closed before we can return from it. + private var closingConnections: [any Channel] = [] static func withRuntimeClient( configuration: Configuration, @@ -89,6 +99,8 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { result = .failure(error) } + await runtime.close() + //try? await runtime.close() return try result.get() } @@ -100,6 +112,31 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { self.logger = logger } + private func close() async { + guard case .notClosing = self.closingState else { + return + } + await withCheckedContinuation { continuation in + self.closingState = .closing(continuation) + + switch self.connectionState { + case .disconnected: + break + + case .connecting(let continuations): + for continuation in continuations { + continuation.resume(throwing: NewLambdaRuntimeError(code: .closingRuntimeClient)) + } + self.connectionState = .connecting([]) + + case .connected(let channel, let lambdaChannelHandler): + channel.clo + } + } + + + } + func nextInvocation() async throws -> (Invocation, Writer) { switch self.lambdaState { case .idle: @@ -190,7 +227,20 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { } private func channelClosed(_ channel: any Channel) { - // TODO: Fill out + switch self.connectionState { + case .disconnected: + break + + case .connecting(let array): + self.connectionState = .disconnected + + for continuation in array { + continuation.resume(throwing: NewLambdaRuntimeError(code: .lostConnectionToControlPlane)) + } + + case .connected: + self.connectionState = .disconnected + } } private func makeOrGetConnection() async throws -> LambdaChannelHandler { @@ -227,6 +277,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { return channel.eventLoop.makeFailedFuture(error) } } + .connectTimeout(.seconds(2)) do { // connect directly via socket address to avoid happy eyeballs (perf) @@ -279,6 +330,35 @@ extension NewLambdaRuntimeClient: LambdaChannelHandlerDelegate { } nonisolated func connectionWillClose(channel: any Channel) { + self.assumeIsolated { isolated in + switch isolated.connectionState { + case .disconnected: + // this case should never happen. But whatever + if channel.isActive { + isolated.closingConnections.append(channel) + } + + case .connecting(let continuations): + // this case should never happen. But whatever + if channel.isActive { + isolated.closingConnections.append(channel) + } + + for continuation in continuations { + continuation.resume(throwing: NewLambdaRuntimeError(code: .connectionToControlPlaneLost)) + } + + case .connected(let stateChannel, _): + guard channel === stateChannel else { + isolated.closingConnections.append(channel) + return + } + + isolated.connectionState = .disconnected + + + } + } } } @@ -288,7 +368,7 @@ private protocol LambdaChannelHandlerDelegate { func connectionErrorHappened(_ error: any Error, channel: any Channel) } -private final class LambdaChannelHandler { +private final class LambdaChannelHandler { let nextInvocationPath = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix enum State { @@ -652,27 +732,18 @@ extension LambdaChannelHandler: ChannelInboundHandler { func errorCaught(context: ChannelHandlerContext, error: Error) { // pending responses will fail with lastError in channelInactive since we are calling context.close + self.delegate.connectionErrorHappened(error, channel: context.channel) + self.lastError = error context.channel.close(promise: nil) } func channelInactive(context: ChannelHandlerContext) { // fail any pending responses with last error or assume peer disconnected - context.fireChannelInactive() - // switch self.state { - // case .idle: - // break - // - // case .running(let promise, let timeout): - // self.state = .idle - // timeout?.cancel() - // promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer) - // - // case .waitForConnectionClose(let response, let promise): - // self.state = .idle - // promise.succeed(response) - // } + // we don't need to forward channelInactive to the delegate, as the delegate observes the + // closeFuture + context.fireChannelInactive() } } diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift index 844f54d3..b95a5587 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift @@ -14,6 +14,8 @@ struct NewLambdaRuntimeError: Error { enum Code { + case closingRuntimeClient + case connectionToControlPlaneLost case connectionToControlPlaneGoingAway case invocationMissingMetadata From cf224a5d70aa5c1f0cba597cd98da8087a779bd2 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 4 Sep 2024 14:18:36 +0200 Subject: [PATCH 13/16] Fix after rebase --- .../NewLambdaRuntimeClient.swift | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index a41f44e8..023d7880 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -25,7 +25,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { var port: Int } - struct Writer: LambdaResponseStreamWriter { + struct Writer: LambdaRuntimeClientResponseStreamWriter { private var runtimeClient: NewLambdaRuntimeClient fileprivate init(runtimeClient: NewLambdaRuntimeClient) { @@ -71,6 +71,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { enum ClosingState { case notClosing case closing(CheckedContinuation) + case closed } private let eventLoop: any EventLoop @@ -121,7 +122,9 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { switch self.connectionState { case .disconnected: - break + if self.closingConnections.isEmpty { + return continuation.resume() + } case .connecting(let continuations): for continuation in continuations { @@ -129,12 +132,10 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { } self.connectionState = .connecting([]) - case .connected(let channel, let lambdaChannelHandler): - channel.clo + case .connected(let channel, _): + channel.close(mode: .all, promise: nil) } } - - } func nextInvocation() async throws -> (Invocation, Writer) { @@ -227,19 +228,35 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { } private func channelClosed(_ channel: any Channel) { - switch self.connectionState { - case .disconnected: - break + switch (self.connectionState, self.closingState) { + case (.disconnected, _), + (_, .closed): + fatalError("Invalid state: \(self.connectionState), \(self.closingState)") - case .connecting(let array): + case (.connecting(let array), .notClosing): self.connectionState = .disconnected - for continuation in array { continuation.resume(throwing: NewLambdaRuntimeError(code: .lostConnectionToControlPlane)) } - case .connected: + case (.connecting(let array), .closing(let continuation)): + self.connectionState = .disconnected + precondition(array.isEmpty, "If we are closing we should have failed all connection attempts already") + if self.closingConnections.isEmpty { + self.closingState = .closed + continuation.resume() + } + + case (.connected, .notClosing): + self.connectionState = .disconnected + + case (.connected, .closing(let continuation)): self.connectionState = .disconnected + + if self.closingConnections.isEmpty { + self.closingState = .closed + continuation.resume() + } } } @@ -356,7 +373,6 @@ extension NewLambdaRuntimeClient: LambdaChannelHandlerDelegate { isolated.connectionState = .disconnected - } } From d6920c36ff1cb3b2b8b4fa834eb726e99b639338 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 4 Sep 2024 14:43:59 +0200 Subject: [PATCH 14/16] better close support --- .../NewLambdaRuntimeClient.swift | 97 ++++++++++--------- 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index 023d7880..a1569452 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -390,6 +390,7 @@ private final class LambdaChannelHandler enum State { case disconnected case connected(ChannelHandlerContext, LambdaState) + case closing enum LambdaState { /// this is the "normal" state. Transitions to `waitingForNextInvocation` @@ -402,7 +403,6 @@ private final class LambdaChannelHandler case waitingForResponse case sendingResponse case sentResponse(CheckedContinuation) - case closing } } @@ -426,11 +426,11 @@ private final class LambdaChannelHandler self.sendNextRequest(context: context) } - case .connected(_, .closing), - .connected(_, .sendingResponse), + case .connected(_, .sendingResponse), .connected(_, .sentResponse), .connected(_, .waitingForNextInvocation), - .connected(_, .waitingForResponse): + .connected(_, .waitingForResponse), + .closing: fatalError("Invalid state: \(self.state)") case .disconnected: @@ -475,7 +475,7 @@ private final class LambdaChannelHandler case .disconnected: throw NewLambdaRuntimeError(code: .connectionToControlPlaneLost) - case .connected(_, .closing): + case .closing: throw NewLambdaRuntimeError(code: .connectionToControlPlaneGoingAway) } } @@ -503,7 +503,7 @@ private final class LambdaChannelHandler case .disconnected: throw NewLambdaRuntimeError(code: .connectionToControlPlaneLost) - case .connected(_, .closing): + case .closing: throw NewLambdaRuntimeError(code: .connectionToControlPlaneGoingAway) } } @@ -536,7 +536,7 @@ private final class LambdaChannelHandler case .disconnected: throw NewLambdaRuntimeError(code: .connectionToControlPlaneLost) - case .connected(_, .closing): + case .closing: throw NewLambdaRuntimeError(code: .connectionToControlPlaneGoingAway) } } @@ -681,12 +681,50 @@ extension LambdaChannelHandler: ChannelInboundHandler { self.state = .connected(context, .idle) case .connected: break + case .closing: + fatalError("Invalid state: \(self.state)") } } func channelRead(context: ChannelHandlerContext, data: NIOAny) { let response = unwrapInboundIn(data) + // As defined in RFC 7230 Section 6.3: + // HTTP/1.1 defaults to the use of "persistent connections", allowing + // multiple requests and responses to be carried over a single + // connection. The "close" connection option is used to signal that a + // connection will not persist after the current request/response. HTTP + // implementations SHOULD support persistent connections. + // + // That's why we only assume the connection shall be closed if we receive + // a "connection = close" header. + let serverCloseConnection = + response.head.headers["connection"].contains(where: { $0.lowercased() == "close" }) + + let closeConnection = serverCloseConnection || response.head.version != .http1_1 + + if closeConnection { + // If we were succeeding the request promise here directly and closing the connection + // after succeeding the promise we may run into a race condition: + // + // The lambda runtime will ask for the next work item directly after a succeeded post + // response request. The desire for the next work item might be faster than the attempt + // to close the connection. This will lead to a situation where we try to the connection + // but the next request has already been scheduled on the connection that we want to + // close. For this reason we postpone succeeding the promise until the connection has + // been closed. This codepath will only be hit in the very, very unlikely event of the + // Lambda control plane demanding to close connection. (It's more or less only + // implemented to support http1.1 correctly.) This behavior is ensured with the test + // `LambdaTest.testNoKeepAliveServer`. + self.state = .closing + self.delegate.connectionWillClose(channel: context.channel) + context.close(promise: nil) + } else { + self.state = .connected(context, .idle) + } + + // handle response content + switch self.state { case .connected(let context, .waitingForNextInvocation(let continuation)): do { @@ -694,7 +732,10 @@ extension LambdaChannelHandler: ChannelInboundHandler { self.state = .connected(context, .waitingForResponse) continuation.resume(returning: Invocation(metadata: metadata, event: response.body ?? ByteBuffer())) } catch { - self.state = .connected(context, .closing) + self.state = .closing + + self.delegate.connectionWillClose(channel: context.channel) + context.close(promise: nil) continuation.resume( throwing: NewLambdaRuntimeError(code: .invocationMissingMetadata, underlying: error) ) @@ -704,46 +745,14 @@ extension LambdaChannelHandler: ChannelInboundHandler { if response.head.status == .accepted { self.state = .connected(context, .idle) continuation.resume() + } else { + self.state = .connected(context, .idle) + continuation.resume(throwing: NewLambdaRuntimeError(code: .unexpectedStatusCodeForRequest)) } - case .disconnected, .connected(_, _): + case .disconnected, .closing, .connected(_, _): break } - - // // As defined in RFC 7230 Section 6.3: - // // HTTP/1.1 defaults to the use of "persistent connections", allowing - // // multiple requests and responses to be carried over a single - // // connection. The "close" connection option is used to signal that a - // // connection will not persist after the current request/response. HTTP - // // implementations SHOULD support persistent connections. - // // - // // That's why we only assume the connection shall be closed if we receive - // // a "connection = close" header. - // let serverCloseConnection = - // response.head.headers["connection"].contains(where: { $0.lowercased() == "close" }) - // - // let closeConnection = serverCloseConnection || response.head.version != .http1_1 - // - // if closeConnection { - // // If we were succeeding the request promise here directly and closing the connection - // // after succeeding the promise we may run into a race condition: - // // - // // The lambda runtime will ask for the next work item directly after a succeeded post - // // response request. The desire for the next work item might be faster than the attempt - // // to close the connection. This will lead to a situation where we try to the connection - // // but the next request has already been scheduled on the connection that we want to - // // close. For this reason we postpone succeeding the promise until the connection has - // // been closed. This codepath will only be hit in the very, very unlikely event of the - // // Lambda control plane demanding to close connection. (It's more or less only - // // implemented to support http1.1 correctly.) This behavior is ensured with the test - // // `LambdaTest.testNoKeepAliveServer`. - // self.state = .waitForConnectionClose(httpResponse, promise) - // _ = context.channel.close() - // return - // } else { - // self.state = .idle - // promise.succeed(httpResponse) - // } } func errorCaught(context: ChannelHandlerContext, error: Error) { From 22634b4a9a5604c4a2f66de5de75715c301b31db Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 4 Sep 2024 15:10:35 +0200 Subject: [PATCH 15/16] Fix close handling --- .../NewLambdaRuntimeClient.swift | 64 +++++++++---------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index a1569452..5d91fd4c 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -689,6 +689,37 @@ extension LambdaChannelHandler: ChannelInboundHandler { func channelRead(context: ChannelHandlerContext, data: NIOAny) { let response = unwrapInboundIn(data) + // handle response content + + switch self.state { + case .connected(let context, .waitingForNextInvocation(let continuation)): + do { + let metadata = try InvocationMetadata(headers: response.head.headers) + self.state = .connected(context, .waitingForResponse) + continuation.resume(returning: Invocation(metadata: metadata, event: response.body ?? ByteBuffer())) + } catch { + self.state = .closing + + self.delegate.connectionWillClose(channel: context.channel) + context.close(promise: nil) + continuation.resume( + throwing: NewLambdaRuntimeError(code: .invocationMissingMetadata, underlying: error) + ) + } + + case .connected(let context, .sentResponse(let continuation)): + if response.head.status == .accepted { + self.state = .connected(context, .idle) + continuation.resume() + } else { + self.state = .connected(context, .idle) + continuation.resume(throwing: NewLambdaRuntimeError(code: .unexpectedStatusCodeForRequest)) + } + + case .disconnected, .closing, .connected(_, _): + break + } + // As defined in RFC 7230 Section 6.3: // HTTP/1.1 defaults to the use of "persistent connections", allowing // multiple requests and responses to be carried over a single @@ -719,39 +750,6 @@ extension LambdaChannelHandler: ChannelInboundHandler { self.state = .closing self.delegate.connectionWillClose(channel: context.channel) context.close(promise: nil) - } else { - self.state = .connected(context, .idle) - } - - // handle response content - - switch self.state { - case .connected(let context, .waitingForNextInvocation(let continuation)): - do { - let metadata = try InvocationMetadata(headers: response.head.headers) - self.state = .connected(context, .waitingForResponse) - continuation.resume(returning: Invocation(metadata: metadata, event: response.body ?? ByteBuffer())) - } catch { - self.state = .closing - - self.delegate.connectionWillClose(channel: context.channel) - context.close(promise: nil) - continuation.resume( - throwing: NewLambdaRuntimeError(code: .invocationMissingMetadata, underlying: error) - ) - } - - case .connected(let context, .sentResponse(let continuation)): - if response.head.status == .accepted { - self.state = .connected(context, .idle) - continuation.resume() - } else { - self.state = .connected(context, .idle) - continuation.resume(throwing: NewLambdaRuntimeError(code: .unexpectedStatusCodeForRequest)) - } - - case .disconnected, .closing, .connected(_, _): - break } } From 03b251bd000420002cd193848bf9b32beeec9e50 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 4 Sep 2024 15:49:07 +0200 Subject: [PATCH 16/16] fix connection closure --- .../NewLambdaRuntimeClient.swift | 34 +++++++++++++++-- .../NewLambdaRuntimeClientTests.swift | 38 ++++--------------- 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index 5d91fd4c..71839163 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -114,6 +114,8 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { } private func close() async { + self.logger.trace("Close lambda runtime client") + guard case .notClosing = self.closingState else { return } @@ -229,10 +231,24 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { private func channelClosed(_ channel: any Channel) { switch (self.connectionState, self.closingState) { - case (.disconnected, _), - (_, .closed): + case (_, .closed): fatalError("Invalid state: \(self.connectionState), \(self.closingState)") + case (.disconnected, .notClosing): + if let index = self.closingConnections.firstIndex(where: { $0 === channel }) { + self.closingConnections.remove(at: index) + } + + case (.disconnected, .closing(let continuation)): + if let index = self.closingConnections.firstIndex(where: { $0 === channel }) { + self.closingConnections.remove(at: index) + } + + if self.closingConnections.isEmpty { + self.closingState = .closed + continuation.resume() + } + case (.connecting(let array), .notClosing): self.connectionState = .disconnected for continuation in array { @@ -303,8 +319,14 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { let handler = try channel.pipeline.syncOperations.handler( type: LambdaChannelHandler.self ) + self.logger.trace( + "Connection to control plane created", + metadata: [ + "lambda_port": "\(self.configuration.port)", + "lambda_ip": "\(self.configuration.ip)", + ] + ) channel.closeFuture.whenComplete { result in - self.eventLoop.preconditionInEventLoop() self.assumeIsolated { runtimeClient in runtimeClient.channelClosed(channel) } @@ -754,6 +776,12 @@ extension LambdaChannelHandler: ChannelInboundHandler { } func errorCaught(context: ChannelHandlerContext, error: Error) { + self.logger.trace( + "Channel error caught", + metadata: [ + "error": "\(error)" + ] + ) // pending responses will fail with lastError in channelInactive since we are calling context.close self.delegate.connectionErrorHappened(error, channel: context.channel) diff --git a/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift index 31cbeec8..023c13a0 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/NewLambdaRuntimeClientTests.swift @@ -24,11 +24,11 @@ import struct Foundation.UUID @Suite struct NewLambdaRuntimeClientTests { - let logger = Logger(label: "NewLambdaClientRuntimeTest") - - init() { - - } + let logger = { + var logger = Logger(label: "NewLambdaClientRuntimeTest") + logger.logLevel = .trace + return logger + }() @Test func testSimpleInvocations() async throws { @@ -57,12 +57,12 @@ struct NewLambdaRuntimeClientTests { } } - try await self.withMockServer(behaviour: HappyBehavior()) { mockServer, eventLoopGroup in - let configuration = NewLambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: 7000) + try await withMockServer(behaviour: HappyBehavior()) { port in + let configuration = NewLambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: port) try await NewLambdaRuntimeClient.withRuntimeClient( configuration: configuration, - eventLoop: eventLoopGroup.next(), + eventLoop: NIOSingletons.posixEventLoopGroup.next(), logger: self.logger ) { runtimeClient in do { @@ -86,26 +86,4 @@ struct NewLambdaRuntimeClientTests { } } } - - func withMockServer( - behaviour: some LambdaServerBehavior, - _ body: (MockLambdaServer, MultiThreadedEventLoopGroup) async throws -> Result - ) async throws -> Result { - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - let server = MockLambdaServer(behavior: behaviour) - _ = try await server.start().get() - - let result: Swift.Result - do { - result = .success(try await body(server, eventLoopGroup)) - } catch { - result = .failure(error) - } - - try? await server.stop().get() - try? await eventLoopGroup.shutdownGracefully() - - return try result.get() - } - }