@@ -197,7 +197,7 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
197197 private var isClosed : Bool = false
198198 /// True when the connection is still open and usable.
199199 public var isOpen : Bool { !isClosed }
200- private var msgReader : MessageReader ? // AsyncThrowingStream-based; no eventLoop hop per read
200+ private var msgReader : TDSFrameReader ? // AsyncThrowingStream-based; no eventLoop hop per read
201201 /// Tracks whether we are inside an explicit transaction (BEGIN TRANSACTION).
202202 private var inTransaction : Bool = false
203203 /// Current transaction descriptor — updated from ENVCHANGE type 8/9/10 responses.
@@ -240,15 +240,15 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
240240
241241 private func handshake( sslContext: NIOSSLContext ? = nil ) async throws {
242242 // 1. Add pipeline: TDSTLSFramer (pass-through initially) + framing + bridge
243- let bridge = AsyncStreamBridge ( )
243+ let bridge = TDSFrameBridge ( )
244244 // Swift 6: ByteToMessageHandler has Sendable marked unavailable (event-loop-bound).
245245 let bridgeBox = _UnsafeSendable ( bridge)
246246 let frameBox = _UnsafeSendable ( ByteToMessageHandler ( TDSFramingHandler ( ) ) )
247247 let framer = tlsFramer
248248 try await channel. eventLoop. submit {
249249 try self . channel. pipeline. syncOperations. addHandlers ( [ framer, frameBox. value, bridgeBox. value] )
250250 } . get ( )
251- msgReader = MessageReader ( bridge)
251+ msgReader = TDSFrameReader ( bridge)
252252
253253 // 2. Pre-Login — negotiate encryption preference
254254 let preLoginResp = try await sendPreLogin ( )
@@ -428,23 +428,53 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
428428
429429 /// Stream rows one-by-one as they are decoded from the TDS response.
430430 ///
431- /// The full TDS message is received before the first row is yielded (TDS framing
432- /// assembles the complete response), but the caller can process rows without
433- /// buffering the entire result set as an array.
431+ /// Rows are yielded incrementally as TDS packets arrive — without buffering
432+ /// the entire result set.
434433 public func queryStream( _ sql: String , _ binds: [ SQLValue ] = [ ] ) -> AsyncThrowingStream < SQLRow , Error > {
435434 AsyncThrowingStream { cont in
436435 Task { [ self ] in
437436 do {
438437 guard !self . isClosed else { throw SQLError . connectionClosed }
439- let dec : TDSTokenDecoder
438+
439+ // Send the query
440440 if binds. isEmpty {
441- dec = try await self . runBatchDecoder ( sql)
441+ var payload = encodeSQLBatch ( sql: sql)
442+ sendPacket ( type: . sqlBatch, payload: & payload)
442443 } else {
443- dec = try await self . runRPCDecoder ( Self . convertPlaceholders ( sql) , binds: binds)
444+ let rpc = TDSRPCRequest ( sql: Self . convertPlaceholders ( sql) , binds: binds)
445+ var payload = rpc. encode ( allocator: channel. allocator)
446+ sendPacket ( type: . rpc, payload: & payload)
444447 }
445- for row in dec. rows {
446- cont. yield ( row)
448+
449+ var dec = TDSTokenDecoder ( )
450+ var remainder : ByteBuffer ? = nil
451+
452+ outerLoop: while true {
453+ guard let frame = try await msgReader!. next ( ) else {
454+ throw SQLError . connectionClosed
455+ }
456+
457+ // Append this packet to any leftover bytes from the previous decode
458+ if remainder == nil || remainder!. readableBytes == 0 {
459+ remainder = frame. payload
460+ } else {
461+ var combined = channel. allocator. buffer (
462+ capacity: remainder!. readableBytes + frame. payload. readableBytes)
463+ combined. writeImmutableBuffer ( remainder!)
464+ combined. writeImmutableBuffer ( frame. payload)
465+ remainder = combined
466+ }
467+
468+ // Decode as many complete tokens as possible
469+ let rows = dec. decodePartial ( buffer: & remainder!)
470+ for row in rows { cont. yield ( row) }
471+
472+ if frame. isEOM { break outerLoop }
447473 }
474+
475+ if let err = dec. serverError { throw err }
476+ if let td = dec. transactionDescriptor { transactionDescriptor = td }
477+ dispatchInfoMessages ( dec)
448478 cont. finish ( )
449479 } catch {
450480 cont. finish ( throwing: error)
@@ -758,10 +788,8 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
758788
759789 /// Receive one complete TDS message via the async stream bridge handler.
760790 private func receivePacket( ) async throws -> ByteBuffer {
761- guard let reader = msgReader, let buf = try await reader. next ( ) else {
762- throw SQLError . connectionClosed
763- }
764- return buf
791+ guard let reader = msgReader else { throw SQLError . connectionClosed }
792+ return try await reader. receiveMessage ( )
765793 }
766794
767795 // MARK: - SQL Batch encoding
0 commit comments