11package omniparser
22
33import (
4- "io"
5-
64 "github.com/jf-tech/omniparser/errs"
75 "github.com/jf-tech/omniparser/schemahandler"
86)
@@ -11,56 +9,50 @@ import (
119// operation. An instance of a Transform must not be shared and reused among different
1210// input streams. An instance of a Transform must not be used across multiple goroutines.
1311type Transform interface {
14- // Next indicates whether the ingestion/transform is completed or not.
15- Next () bool
1612 // Read returns a JSON byte slice representing one ingested and transformed record.
13+ // io.EOF should be returned when input stream is completely consumed and future calls
14+ // to Read should always return io.EOF.
15+ // errs.ErrTransformFailed should be returned when a record ingestion and transformation
16+ // failed and such failure isn't considered fatal. Future calls to Read will attempt
17+ // new record ingestion and transformations.
18+ // Any other error returned is considered fatal and future calls to Read will always
19+ // return the same error.
20+ // Note if returned error isn't nil, then returned []byte will be nil.
1721 Read () ([]byte , error )
1822}
1923
2024type transform struct {
21- ingester schemahandler.Ingester
22- curErr error
23- curRecord []byte
25+ ingester schemahandler.Ingester
26+ lastErr error
2427}
2528
26- // Next calls the underlying schema handler's ingester to do the ingestion and transformation, saves
27- // the resulting record and/or error, and returns whether the entire operation is completed or not.
28- func (o * transform ) Next () bool {
29- // ErrTransformFailed is a generic wrapping error around all handlers' ingesters'
29+ // Read returns a JSON byte slice representing one ingested and transformed record.
30+ // io.EOF should be returned when input stream is completely consumed and future calls
31+ // to Read should always return io.EOF.
32+ // errs.ErrTransformFailed should be returned when a record ingestion and transformation
33+ // failed and such failure isn't considered fatal. Future calls to Read will attempt
34+ // new record ingestion and transformations.
35+ // Any other error returned is considered fatal and future calls to Read will always
36+ // return the same error.
37+ // Note if returned error isn't nil, then returned []byte will be nil.
38+ func (o * transform ) Read () ([]byte , error ) {
39+ // errs.ErrTransformFailed is a generic wrapping error around all handlers' ingesters'
3040 // **continuable** errors (so client side doesn't have to deal with myriad of different
31- // types of benign continuable errors. All other errors: non-continuable errors or ErrEOF
41+ // types of benign continuable errors. All other errors: non-continuable errors or io.EOF
3242 // should cause the operation to cease.
33- if o .curErr != nil && ! errs .IsErrTransformFailed (o .curErr ) {
34- return false
43+ if o .lastErr != nil && ! errs .IsErrTransformFailed (o .lastErr ) {
44+ return nil , o . lastErr
3545 }
36-
37- for {
38- record , err := o .ingester .Read ()
39- switch {
40- case err == nil :
41- o .curRecord = record
42- o .curErr = nil
43- return true
44- case err == io .EOF :
45- o .curErr = err
46- o .curRecord = nil
47- // No more processing needed.
48- return false
49- default :
50- o .curErr = err
46+ record , err := o .ingester .Read ()
47+ if err != nil {
48+ if o .ingester .IsContinuableError (err ) {
5149 // If ingester error is continuable, wrap it into a standard generic ErrTransformFailed
5250 // so caller has an easier time to deal with it. If fatal error, then leave it raw to the
5351 // caller so they can decide what it is and how to proceed.
54- if o .ingester .IsContinuableError (err ) {
55- o .curErr = errs .ErrTransformFailed (err .Error ())
56- }
57- o .curRecord = nil
58- return true
52+ err = errs .ErrTransformFailed (err .Error ())
5953 }
54+ record = nil
6055 }
61- }
62-
63- // Read returns the current ingested and transformed record and/or the current error.
64- func (o * transform ) Read () ([]byte , error ) {
65- return o .curRecord , o .curErr
56+ o .lastErr = err
57+ return record , err
6658}
0 commit comments