Skip to content

Commit c464bf9

Browse files
authored
Don't hold a lock over a continuation in test helpers (#872)
Motivation: The various 'withMumbleContinuation' APIs are supposed to be invoked synchronously with the caller. This assumption allows a lock to be acquired before the call and released from the body of the 'withMumbleContinuation' after e.g. storing the continuation. However this isn't the case and the job may be re-enqueued on the executor meaning that this is pattern is vulnerable to deadlocks. Modifications: - Rework the test helpers to avoid holding a lock when a continuation is created. - Switch to using NIOLockedValue box Result: Lower chance of deadlock
1 parent 3c45dbd commit c464bf9

File tree

1 file changed

+127
-63
lines changed

1 file changed

+127
-63
lines changed

Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift

Lines changed: 127 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,11 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
4343
case failed(Error, CheckedContinuation<Void, Never>?)
4444
}
4545

46-
private var _state = State.buffering(.init(), nil)
47-
private let lock = NIOLock()
46+
private let state = NIOLockedValueBox<State>(.buffering([], nil))
4847

4948
public var hasDemand: Bool {
50-
self.lock.withLock {
51-
switch self._state {
49+
self.state.withLockedValue { state in
50+
switch state {
5251
case .failed, .finished, .buffering:
5352
return false
5453
case .waiting:
@@ -59,67 +58,132 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
5958

6059
/// Wait until a downstream consumer has issued more demand by calling `next`.
6160
public func demand() async {
62-
self.lock.lock()
61+
let shouldBuffer = self.state.withLockedValue { state in
62+
switch state {
63+
case .buffering(_, .none):
64+
return true
65+
case .waiting:
66+
return false
67+
case .buffering(_, .some), .failed(_, .some):
68+
preconditionFailure("Already waiting for demand. Invalid state: \(state)")
69+
case .finished, .failed:
70+
preconditionFailure("Invalid state: \(state)")
71+
}
72+
}
6373

64-
switch self._state {
65-
case .buffering(let buffer, .none):
74+
if shouldBuffer {
6675
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
67-
self._state = .buffering(buffer, continuation)
68-
self.lock.unlock()
76+
let shouldResumeContinuation = self.state.withLockedValue { state in
77+
switch state {
78+
case .buffering(let buffer, .none):
79+
state = .buffering(buffer, continuation)
80+
return false
81+
case .waiting:
82+
return true
83+
case .buffering(_, .some), .failed(_, .some):
84+
preconditionFailure("Already waiting for demand. Invalid state: \(state)")
85+
case .finished, .failed:
86+
preconditionFailure("Invalid state: \(state)")
87+
}
88+
}
89+
90+
if shouldResumeContinuation {
91+
continuation.resume()
92+
}
6993
}
70-
71-
case .waiting:
72-
self.lock.unlock()
73-
return
74-
75-
case .buffering(_, .some), .failed(_, .some):
76-
let state = self._state
77-
self.lock.unlock()
78-
preconditionFailure("Already waiting for demand. Invalid state: \(state)")
79-
80-
case .finished, .failed:
81-
let state = self._state
82-
self.lock.unlock()
83-
preconditionFailure("Invalid state: \(state)")
8494
}
8595
}
8696

97+
private enum NextAction {
98+
/// Resume the continuation if present, and return the result if present.
99+
case resumeAndReturn(CheckedContinuation<Void, Never>?, Result<Element?, Error>?)
100+
/// Suspend the current task and wait for the next value.
101+
case suspend
102+
}
103+
87104
private func next() async throws -> Element? {
88-
self.lock.lock()
89-
switch self._state {
90-
case .buffering(let buffer, let demandContinuation) where buffer.isEmpty:
91-
return try await withCheckedThrowingContinuation { continuation in
92-
self._state = .waiting(continuation)
93-
self.lock.unlock()
94-
demandContinuation?.resume(returning: ())
95-
}
105+
let action: NextAction = self.state.withLockedValue { state in
106+
switch state {
107+
case .buffering(var buffer, let demandContinuation):
108+
if buffer.isEmpty {
109+
return .suspend
110+
} else {
111+
let first = buffer.removeFirst()
112+
if first != nil {
113+
state = .buffering(buffer, demandContinuation)
114+
} else {
115+
state = .finished
116+
}
117+
return .resumeAndReturn(nil, .success(first))
118+
}
119+
120+
case .failed(let error, let demandContinuation):
121+
state = .finished
122+
return .resumeAndReturn(demandContinuation, .failure(error))
123+
124+
case .finished:
125+
return .resumeAndReturn(nil, .success(nil))
96126

97-
case .buffering(var buffer, let demandContinuation):
98-
let first = buffer.removeFirst()
99-
if first != nil {
100-
self._state = .buffering(buffer, demandContinuation)
101-
} else {
102-
self._state = .finished
127+
case .waiting:
128+
preconditionFailure(
129+
"Expected that there is always only one concurrent call to next. Invalid state: \(state)"
130+
)
103131
}
104-
self.lock.unlock()
105-
return first
132+
}
106133

107-
case .failed(let error, let demandContinuation):
108-
self._state = .finished
109-
self.lock.unlock()
134+
switch action {
135+
case .resumeAndReturn(let demandContinuation, let result):
110136
demandContinuation?.resume()
111-
throw error
112-
113-
case .finished:
114-
self.lock.unlock()
115-
return nil
116-
117-
case .waiting:
118-
let state = self._state
119-
self.lock.unlock()
120-
preconditionFailure(
121-
"Expected that there is always only one concurrent call to next. Invalid state: \(state)"
122-
)
137+
return try result?.get()
138+
139+
case .suspend:
140+
// Holding the lock here *should* be safe but because of a bug in the runtime
141+
// it isn't, so drop the lock, create the continuation and then try again.
142+
//
143+
// See https://github.com/swiftlang/swift/issues/85668
144+
return try await withCheckedThrowingContinuation {
145+
(continuation: CheckedContinuation<Element?, any Error>) in
146+
let action: NextAction = self.state.withLockedValue { state in
147+
switch state {
148+
case .buffering(var buffer, let demandContinuation):
149+
if buffer.isEmpty {
150+
state = .waiting(continuation)
151+
return .resumeAndReturn(demandContinuation, nil)
152+
} else {
153+
let first = buffer.removeFirst()
154+
if first != nil {
155+
state = .buffering(buffer, demandContinuation)
156+
} else {
157+
state = .finished
158+
}
159+
return .resumeAndReturn(nil, .success(first))
160+
}
161+
162+
case .failed(let error, let demandContinuation):
163+
state = .finished
164+
return .resumeAndReturn(demandContinuation, .failure(error))
165+
166+
case .finished:
167+
return .resumeAndReturn(nil, .success(nil))
168+
169+
case .waiting:
170+
preconditionFailure(
171+
"Expected that there is always only one concurrent call to next. Invalid state: \(state)"
172+
)
173+
}
174+
}
175+
176+
switch action {
177+
case .resumeAndReturn(let demandContinuation, let result):
178+
demandContinuation?.resume()
179+
// Resume the continuation rather than returning th result.
180+
if let result {
181+
continuation.resume(with: result)
182+
}
183+
case .suspend:
184+
preconditionFailure() // Not returned from the code above.
185+
}
186+
}
123187
}
124188
}
125189

@@ -137,19 +201,19 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
137201
}
138202

139203
private func writeBufferOrEnd(_ element: Element?) {
140-
let writeAction = self.lock.withLock { () -> WriteAction in
141-
switch self._state {
204+
let writeAction = self.state.withLockedValue { state -> WriteAction in
205+
switch state {
142206
case .buffering(var buffer, let continuation):
143207
buffer.append(element)
144-
self._state = .buffering(buffer, continuation)
208+
state = .buffering(buffer, continuation)
145209
return .none
146210

147211
case .waiting(let continuation):
148-
self._state = .buffering(.init(), nil)
212+
state = .buffering(.init(), nil)
149213
return .succeedContinuation(continuation, element)
150214

151215
case .finished, .failed:
152-
preconditionFailure("Invalid state: \(self._state)")
216+
preconditionFailure("Invalid state: \(state)")
153217
}
154218
}
155219

@@ -170,17 +234,17 @@ final class AsyncSequenceWriter<Element: Sendable>: AsyncSequence, @unchecked Se
170234
/// Drops all buffered writes and emits an error on the waiting `next`. If there is no call to `next`
171235
/// waiting, will emit the error on the next call to `next`.
172236
public func fail(_ error: Error) {
173-
let errorAction = self.lock.withLock { () -> ErrorAction in
174-
switch self._state {
237+
let errorAction = self.state.withLockedValue { state -> ErrorAction in
238+
switch state {
175239
case .buffering(_, let demandContinuation):
176-
self._state = .failed(error, demandContinuation)
240+
state = .failed(error, demandContinuation)
177241
return .none
178242

179243
case .failed, .finished:
180244
return .none
181245

182246
case .waiting(let continuation):
183-
self._state = .finished
247+
state = .finished
184248
return .failContinuation(continuation, error)
185249
}
186250
}

0 commit comments

Comments
 (0)