Skip to content

Commit 0eede83

Browse files
committed
Progress on observing connection close
1 parent fc969df commit 0eede83

File tree

2 files changed

+90
-17
lines changed

2 files changed

+90
-17
lines changed

Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift

+88-17
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import Logging
1616
import NIOCore
1717
import NIOHTTP1
1818
import NIOPosix
19-
import _NIOBase64
2019

2120
final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
2221
nonisolated let unownedExecutor: UnownedSerialExecutor
@@ -69,11 +68,22 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
6968
case sentResponse(requestID: String)
7069
}
7170

71+
enum ClosingState {
72+
case notClosing
73+
case closing(CheckedContinuation<Void, Never>)
74+
}
75+
7276
private let eventLoop: any EventLoop
7377
private let logger: Logger
7478
private let configuration: Configuration
79+
7580
private var connectionState: ConnectionState = .disconnected
7681
private var lambdaState: LambdaState = .idle(previousRequestID: nil)
82+
private var closingState: ClosingState = .notClosing
83+
84+
// connections that are currently being closed. In the `run` method we must await all of them
85+
// being fully closed before we can return from it.
86+
private var closingConnections: [any Channel] = []
7787

7888
static func withRuntimeClient<Result>(
7989
configuration: Configuration,
@@ -89,6 +99,8 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
8999
result = .failure(error)
90100
}
91101

102+
await runtime.close()
103+
92104
//try? await runtime.close()
93105
return try result.get()
94106
}
@@ -100,6 +112,31 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
100112
self.logger = logger
101113
}
102114

115+
private func close() async {
116+
guard case .notClosing = self.closingState else {
117+
return
118+
}
119+
await withCheckedContinuation { continuation in
120+
self.closingState = .closing(continuation)
121+
122+
switch self.connectionState {
123+
case .disconnected:
124+
break
125+
126+
case .connecting(let continuations):
127+
for continuation in continuations {
128+
continuation.resume(throwing: NewLambdaRuntimeError(code: .closingRuntimeClient))
129+
}
130+
self.connectionState = .connecting([])
131+
132+
case .connected(let channel, let lambdaChannelHandler):
133+
channel.clo
134+
}
135+
}
136+
137+
138+
}
139+
103140
func nextInvocation() async throws -> (Invocation, Writer) {
104141
switch self.lambdaState {
105142
case .idle:
@@ -190,7 +227,20 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
190227
}
191228

192229
private func channelClosed(_ channel: any Channel) {
193-
// TODO: Fill out
230+
switch self.connectionState {
231+
case .disconnected:
232+
break
233+
234+
case .connecting(let array):
235+
self.connectionState = .disconnected
236+
237+
for continuation in array {
238+
continuation.resume(throwing: NewLambdaRuntimeError(code: .lostConnectionToControlPlane))
239+
}
240+
241+
case .connected:
242+
self.connectionState = .disconnected
243+
}
194244
}
195245

196246
private func makeOrGetConnection() async throws -> LambdaChannelHandler<NewLambdaRuntimeClient> {
@@ -227,6 +277,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
227277
return channel.eventLoop.makeFailedFuture(error)
228278
}
229279
}
280+
.connectTimeout(.seconds(2))
230281

231282
do {
232283
// connect directly via socket address to avoid happy eyeballs (perf)
@@ -279,6 +330,35 @@ extension NewLambdaRuntimeClient: LambdaChannelHandlerDelegate {
279330
}
280331

281332
nonisolated func connectionWillClose(channel: any Channel) {
333+
self.assumeIsolated { isolated in
334+
switch isolated.connectionState {
335+
case .disconnected:
336+
// this case should never happen. But whatever
337+
if channel.isActive {
338+
isolated.closingConnections.append(channel)
339+
}
340+
341+
case .connecting(let continuations):
342+
// this case should never happen. But whatever
343+
if channel.isActive {
344+
isolated.closingConnections.append(channel)
345+
}
346+
347+
for continuation in continuations {
348+
continuation.resume(throwing: NewLambdaRuntimeError(code: .connectionToControlPlaneLost))
349+
}
350+
351+
case .connected(let stateChannel, _):
352+
guard channel === stateChannel else {
353+
isolated.closingConnections.append(channel)
354+
return
355+
}
356+
357+
isolated.connectionState = .disconnected
358+
359+
360+
}
361+
}
282362

283363
}
284364
}
@@ -288,7 +368,7 @@ private protocol LambdaChannelHandlerDelegate {
288368
func connectionErrorHappened(_ error: any Error, channel: any Channel)
289369
}
290370

291-
private final class LambdaChannelHandler<Delegate> {
371+
private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate> {
292372
let nextInvocationPath = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix
293373

294374
enum State {
@@ -652,27 +732,18 @@ extension LambdaChannelHandler: ChannelInboundHandler {
652732

653733
func errorCaught(context: ChannelHandlerContext, error: Error) {
654734
// pending responses will fail with lastError in channelInactive since we are calling context.close
735+
self.delegate.connectionErrorHappened(error, channel: context.channel)
736+
655737
self.lastError = error
656738
context.channel.close(promise: nil)
657739
}
658740

659741
func channelInactive(context: ChannelHandlerContext) {
660742
// fail any pending responses with last error or assume peer disconnected
661-
context.fireChannelInactive()
662743

663-
// switch self.state {
664-
// case .idle:
665-
// break
666-
//
667-
// case .running(let promise, let timeout):
668-
// self.state = .idle
669-
// timeout?.cancel()
670-
// promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer)
671-
//
672-
// case .waitForConnectionClose(let response, let promise):
673-
// self.state = .idle
674-
// promise.succeed(response)
675-
// }
744+
// we don't need to forward channelInactive to the delegate, as the delegate observes the
745+
// closeFuture
746+
context.fireChannelInactive()
676747
}
677748
}
678749

Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
struct NewLambdaRuntimeError: Error {
1616
enum Code {
17+
case closingRuntimeClient
18+
1719
case connectionToControlPlaneLost
1820
case connectionToControlPlaneGoingAway
1921
case invocationMissingMetadata

0 commit comments

Comments
 (0)