From e9e852c02977b02e6a5f03da47d6253cf370ddb2 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Mon, 15 Jun 2020 16:07:43 +0100 Subject: [PATCH 1/3] all internal connection flow should be executed --- Sources/AsyncHTTPClient/ConnectionPool.swift | 24 +-- .../AsyncHTTPClient/ConnectionsState.swift | 18 +- .../ConnectionPoolTests.swift | 162 ++++++++++++++++++ 3 files changed, 187 insertions(+), 17 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index f18a30a98..b2aaf8504 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -374,15 +374,6 @@ class HTTP1ConnectionProvider { "ahc-action": "\(action)"]) connection.channel.close(promise: nil) self.execute(action, logger: logger) - case .cancel(let connection, let close): - logger.trace("cancelling connection", - metadata: ["ahc-connection": "\(connection)", - "ahc-close": "\(close)"]) - connection.cancel().whenComplete { _ in - if close { - self.closeAndDelete() - } - } case .fail(let waiter, let error): logger.debug("failing connection for waiter", metadata: ["ahc-waiter": "\(waiter)", @@ -428,7 +419,15 @@ class HTTP1ConnectionProvider { } return self.state.offer(connection: connection) } - waiter.promise.succeed(connection) + + switch action { + case .closeAnd: + // This happens when client was shut down during connect + connection.channel.close(promise: nil) + waiter.promise.fail(HTTPClientError.cancelled) + default: + waiter.promise.succeed(connection) + } case .failure(let error): logger.debug("connection attempt failed", metadata: ["ahc-error": "\(error)"]) @@ -437,6 +436,7 @@ class HTTP1ConnectionProvider { } waiter.promise.fail(error) } + waiter.setupComplete.whenComplete { _ in self.execute(action, logger: logger) } @@ -456,7 +456,7 @@ class HTTP1ConnectionProvider { // Since both `.park` and `.deleteProvider` are terminal in terms of execution, // we can execute them immediately self.execute(action, logger: logger) - case .cancel, .closeAnd, .create, .fail, .lease, .parkAnd, .replace: + case .closeAnd, .create, .fail, .lease, .parkAnd, .replace: // This is needed to start a new stack, otherwise, since this is called on a previous // future completion handler chain, it will be growing indefinitely until the connection is closed. // We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved. @@ -493,7 +493,7 @@ class HTTP1ConnectionProvider { $0.promise.fail(HTTPClientError.cancelled) } - if available.isEmpty, leased.isEmpty { + if available.isEmpty, leased.isEmpty, clean { self.closePromise.succeed(()) return self.closePromise.futureResult.map { clean } } diff --git a/Sources/AsyncHTTPClient/ConnectionsState.swift b/Sources/AsyncHTTPClient/ConnectionsState.swift index 0c5f908e2..d99416a49 100644 --- a/Sources/AsyncHTTPClient/ConnectionsState.swift +++ b/Sources/AsyncHTTPClient/ConnectionsState.swift @@ -23,7 +23,6 @@ extension HTTP1ConnectionProvider { case park(Connection) case none case fail(Waiter, Error) - case cancel(Connection, Bool) indirect case closeAnd(Connection, Action) indirect case parkAnd(Connection, Action) } @@ -209,8 +208,9 @@ extension HTTP1ConnectionProvider { case .active: self.leasedConnections.insert(ConnectionKey(connection)) return .none - case .closed: // This can happen when we close the client while connections was being estableshed - return .cancel(connection, self.isEmpty) + case .closed: // This can happen when we close the client while connections was being established + self.openedConnectionsCount -= 1 + return .closeAnd(connection, self.processNextWaiter()) } } @@ -233,7 +233,9 @@ extension HTTP1ConnectionProvider { // user calls `syncShutdown` before we received an error from the bootstrap. In this scenario, // pool will be `.closed` but connection will be still in the process of being established/failed, // so then this process finishes, it will get to this point. - return .none + // We need to call `processNextWaiter` to finish deleting provider from the pool. + self.openedConnectionsCount -= 1 + return self.processNextWaiter() } } @@ -273,7 +275,13 @@ extension HTTP1ConnectionProvider { return .closeAnd(connection, self.processNextWaiter()) case .closed: - return .none + // This situation can happen when we call close, state changes, but before we call `close` on all + // available connections, in this case we should close this connection and, potentially, + // delete the provider + self.openedConnectionsCount -= 1 + self.availableConnections.removeAll { $0 === connection } + + return .closeAnd(connection, self.processNextWaiter()) } } diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift index bacc04dd8..ca0979345 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -1391,6 +1391,168 @@ class ConnectionPoolTests: XCTestCase { } } + // MARK: - Shutdown tests + + func testShutdownOnPendingAndSuccess() { + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + + XCTAssertTrue(state.enqueue()) + + let connectionPromise = self.eventLoop.makePromise(of: Connection.self) + let setupPromise = self.eventLoop.makePromise(of: Void.self) + let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) + var action = state.acquire(waiter: waiter) + + switch action { + case .create: + // expected + break + default: + XCTFail("Unexpected action: \(action)") + } + + let snapshot = state.testsOnly_getInternalState() + XCTAssertEqual(snapshot.openedConnectionsCount, 1) + + if let (waiters, available, leased, clean) = state.close() { + XCTAssertTrue(waiters.isEmpty) + XCTAssertTrue(available.isEmpty) + XCTAssertTrue(leased.isEmpty) + XCTAssertFalse(clean) + } else { + XCTFail("Expecting snapshot") + } + + let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) + + action = state.offer(connection: connection) + switch action { + case .closeAnd(_, let next): + switch next { + case .closeProvider: + // expected + break + default: + XCTFail("Unexpected action: \(action)") + } + default: + XCTFail("Unexpected action: \(action)") + } + + connectionPromise.fail(TempError()) + setupPromise.succeed(()) + } + + func testShutdownOnPendingAndError() { + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + + XCTAssertTrue(state.enqueue()) + + let connectionPromise = self.eventLoop.makePromise(of: Connection.self) + let setupPromise = self.eventLoop.makePromise(of: Void.self) + let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) + var action = state.acquire(waiter: waiter) + + switch action { + case .create: + // expected + break + default: + XCTFail("Unexpected action: \(action)") + } + + let snapshot = state.testsOnly_getInternalState() + XCTAssertEqual(snapshot.openedConnectionsCount, 1) + + if let (waiters, available, leased, clean) = state.close() { + XCTAssertTrue(waiters.isEmpty) + XCTAssertTrue(available.isEmpty) + XCTAssertTrue(leased.isEmpty) + XCTAssertFalse(clean) + } else { + XCTFail("Expecting snapshot") + } + + action = state.connectFailed() + switch action { + case .closeProvider: + // expected + break + default: + XCTFail("Unexpected action: \(action)") + } + + connectionPromise.fail(TempError()) + setupPromise.succeed(()) + } + + func testShutdownTimeout() { + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + + let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) + snapshot.availableConnections.append(connection) + snapshot.openedConnectionsCount = 1 + + state.testsOnly_setInternalState(snapshot) + + if let (waiters, available, leased, clean) = state.close() { + XCTAssertTrue(waiters.isEmpty) + XCTAssertFalse(available.isEmpty) + XCTAssertTrue(leased.isEmpty) + XCTAssertTrue(clean) + } else { + XCTFail("Expecting snapshot") + } + + let action = state.timeout(connection: connection) + switch action { + case .closeAnd(_, let next): + switch next { + case .closeProvider: + // expected + break + default: + XCTFail("Unexpected action: \(action)") + } + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testShutdownRemoteClosed() { + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + + var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + + let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) + snapshot.availableConnections.append(connection) + snapshot.openedConnectionsCount = 1 + + state.testsOnly_setInternalState(snapshot) + + if let (waiters, available, leased, clean) = state.close() { + XCTAssertTrue(waiters.isEmpty) + XCTAssertFalse(available.isEmpty) + XCTAssertTrue(leased.isEmpty) + XCTAssertTrue(clean) + } else { + XCTFail("Expecting snapshot") + } + + let action = state.remoteClosed(connection: connection) + switch action { + case .closeProvider: + // expected + break + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Helpers + override func setUp() { XCTAssertNil(self.eventLoop) XCTAssertNil(self.http1ConnectionProvider) From 0aeb53562d92840553ac04ac34a170196468acae Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 19 Jun 2020 14:52:10 +0100 Subject: [PATCH 2/3] swiftformat and linux tests --- Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift index b751ec1ce..04cfa6421 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift @@ -62,6 +62,10 @@ extension ConnectionPoolTests { ("testConnectionRemoteCloseRelease", testConnectionRemoteCloseRelease), ("testConnectionTimeoutRelease", testConnectionTimeoutRelease), ("testAcquireAvailableBecomesUnavailable", testAcquireAvailableBecomesUnavailable), + ("testShutdownOnPendingAndSuccess", testShutdownOnPendingAndSuccess), + ("testShutdownOnPendingAndError", testShutdownOnPendingAndError), + ("testShutdownTimeout", testShutdownTimeout), + ("testShutdownRemoteClosed", testShutdownRemoteClosed), ] } } From d3240aad8fd160fb4090993712e35da43cf8d7a8 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Mon, 22 Jun 2020 16:03:07 +0100 Subject: [PATCH 3/3] review fixes --- Sources/AsyncHTTPClient/ConnectionPool.swift | 2 + .../ConnectionPoolTests.swift | 41 ++++++------------- 2 files changed, 14 insertions(+), 29 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index b2aaf8504..ce648469c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -423,6 +423,8 @@ class HTTP1ConnectionProvider { switch action { case .closeAnd: // This happens when client was shut down during connect + logger.trace("connection cancelled due to client shutdown", + metadata: ["ahc-connection": "\(channel)"]) connection.channel.close(promise: nil) waiter.promise.fail(HTTPClientError.cancelled) default: diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift index ca0979345..aee407b28 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -1403,12 +1403,9 @@ class ConnectionPoolTests: XCTestCase { let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) var action = state.acquire(waiter: waiter) - switch action { - case .create: - // expected - break - default: - XCTFail("Unexpected action: \(action)") + guard case .create = action else { + XCTFail("unexpected action \(action)") + return } let snapshot = state.testsOnly_getInternalState() @@ -1426,17 +1423,9 @@ class ConnectionPoolTests: XCTestCase { let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) action = state.offer(connection: connection) - switch action { - case .closeAnd(_, let next): - switch next { - case .closeProvider: - // expected - break - default: - XCTFail("Unexpected action: \(action)") - } - default: - XCTFail("Unexpected action: \(action)") + guard case .closeAnd(_, .closeProvider) = action else { + XCTFail("unexpected action \(action)") + return } connectionPromise.fail(TempError()) @@ -1453,12 +1442,9 @@ class ConnectionPoolTests: XCTestCase { let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) var action = state.acquire(waiter: waiter) - switch action { - case .create: - // expected - break - default: - XCTFail("Unexpected action: \(action)") + guard case .create = action else { + XCTFail("unexpected action \(action)") + return } let snapshot = state.testsOnly_getInternalState() @@ -1474,12 +1460,9 @@ class ConnectionPoolTests: XCTestCase { } action = state.connectFailed() - switch action { - case .closeProvider: - // expected - break - default: - XCTFail("Unexpected action: \(action)") + guard case .closeProvider = action else { + XCTFail("unexpected action \(action)") + return } connectionPromise.fail(TempError())