@@ -97,3 +97,75 @@ open class AsyncDataPipeline<Input, Output> {
9797 }
9898 }
9999}
100+
101+ // MARK: - Type-Erased Pipelines (AnyPipeline)
102+
103+ /// A type-erased async pipeline stage that operates on Any input/output.
104+ /// Useful for dynamic pipelines where stages are assembled at runtime.
105+ public struct AnyAsyncPipelineStage : AsyncDataPipelineStage {
106+ public typealias Input = Any
107+ public typealias Output = Any
108+
109+ private let _process : ( Any ) async throws -> Any
110+
111+ public init < S: AsyncDataPipelineStage > ( _ stage: S ) {
112+ self . _process = { input in
113+ guard let typedInput = input as? S . Input else {
114+ throw PipelineError . invalidInputType ( expected: String ( describing: S . Input. self) , actual: String ( describing: type ( of: input) ) )
115+ }
116+ return try await stage. process ( typedInput)
117+ }
118+ }
119+
120+ public init ( process: @escaping ( Any ) async throws -> Any ) {
121+ self . _process = process
122+ }
123+
124+ public func process( _ input: Any ) async throws -> Any {
125+ return try await _process ( input)
126+ }
127+ }
128+
129+ /// Errors thrown by the pipeline
130+ public enum PipelineError : Error , LocalizedError {
131+ case invalidInputType( expected: String , actual: String )
132+ case stageFailure( stageIndex: Int , underlyingError: Error )
133+
134+ public var errorDescription : String ? {
135+ switch self {
136+ case . invalidInputType( let expected, let actual) :
137+ return " Pipeline stage expected input of type ' \( expected) ' but received ' \( actual) '. "
138+ case . stageFailure( let index, let error) :
139+ return " Pipeline execution failed at stage \( index) : \( error. localizedDescription) "
140+ }
141+ }
142+ }
143+
144+ /// A pipeline builder that chains type-erased stages dynamically.
145+ public class DynamicAsyncPipeline {
146+ private var stages : [ AnyAsyncPipelineStage ] = [ ]
147+
148+ public init ( ) { }
149+
150+ public func append( _ stage: AnyAsyncPipelineStage ) {
151+ stages. append ( stage)
152+ }
153+
154+ public func append< S: AsyncDataPipelineStage > ( _ stage: S ) {
155+ stages. append ( AnyAsyncPipelineStage ( stage) )
156+ }
157+
158+ public func execute( input: Any ) async throws -> Any {
159+ var currentData = input
160+
161+ for (index, stage) in stages. enumerated ( ) {
162+ do {
163+ currentData = try await stage. process ( currentData)
164+ } catch {
165+ throw PipelineError . stageFailure ( stageIndex: index, underlyingError: error)
166+ }
167+ }
168+
169+ return currentData
170+ }
171+ }
0 commit comments