From cf6863c8cffbe83d7c8f741f7fd5c65c7cb5ebda Mon Sep 17 00:00:00 2001 From: a3hadi Date: Tue, 30 Apr 2024 14:55:45 -0400 Subject: [PATCH 1/9] chore: bypass transformer call if UDTransformer is not defined Signed-off-by: a3hadi --- .../forward/applier/sourcetransformer.go | 9 --- pkg/sources/forward/data_forward.go | 65 +++++++++++-------- pkg/sources/generator/tickgen_test.go | 7 +- pkg/sources/http/http_test.go | 3 +- pkg/sources/kafka/handler_test.go | 3 +- pkg/sources/kafka/reader_test.go | 9 ++- pkg/sources/nats/nats_test.go | 3 +- pkg/sources/source.go | 4 +- 8 files changed, 49 insertions(+), 54 deletions(-) diff --git a/pkg/sources/forward/applier/sourcetransformer.go b/pkg/sources/forward/applier/sourcetransformer.go index bd81436190..795cd4c5a2 100644 --- a/pkg/sources/forward/applier/sourcetransformer.go +++ b/pkg/sources/forward/applier/sourcetransformer.go @@ -35,12 +35,3 @@ type ApplySourceTransformFunc func(ctx context.Context, message *isb.ReadMessage func (f ApplySourceTransformFunc) ApplyTransform(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) { return f(ctx, message) } - -var ( - // Terminal Applier do not make any change to the message - Terminal = ApplySourceTransformFunc(func(ctx context.Context, msg *isb.ReadMessage) ([]*isb.WriteMessage, error) { - return []*isb.WriteMessage{{ - Message: msg.Message, - }}, nil - }) -) diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index b125667439..a066a5677f 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -278,35 +278,44 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // applyTransformer, if there is an Internal error, it is a blocking call and // will return only if shutdown has been initiated. - // create a pool of Transformer Processors - var wg sync.WaitGroup - for i := 0; i < df.opts.transformerConcurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - df.concurrentApplyTransformer(ctx, transformerCh) - }() - } - concurrentTransformerProcessingStart := time.Now() - - // send to transformer only the data messages - for idx, m := range readMessages { - // emit message size metric - metrics.ReadBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(m.Payload))) - // assign watermark to the message - m.Watermark = time.Time(processorWM) - // send transformer processing work to the channel - transformerResults[idx].readMessage = m - transformerCh <- &transformerResults[idx] + // If a user defined transformer exists, apply it + if df.transformer != nil { + // create a pool of Transformer Processors + var wg sync.WaitGroup + for i := 0; i < df.opts.transformerConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + df.concurrentApplyTransformer(ctx, transformerCh) + }() + } + concurrentTransformerProcessingStart := time.Now() + + // send to transformer only the data messages + for idx, m := range readMessages { + // emit message size metric + metrics.ReadBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(m.Payload))) + // assign watermark to the message + m.Watermark = time.Time(processorWM) + // send transformer processing work to the channel + transformerResults[idx].readMessage = m + transformerCh <- &transformerResults[idx] + } + // let the go routines know that there is no more work + close(transformerCh) + // wait till the processing is done. this will not be an infinite wait because the transformer processing will exit if + // context.Done() is closed. + wg.Wait() + df.opts.logger.Debugw("concurrent applyTransformer completed", zap.Int("concurrency", df.opts.transformerConcurrency), zap.Duration("took", time.Since(concurrentTransformerProcessingStart))) + metrics.SourceTransformerConcurrentProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Observe(float64(time.Since(concurrentTransformerProcessingStart).Microseconds())) + // transformer processing is done. + } else { + // if no user-defined transformer exists, then the messages to write will be identical to the messages read from source + for idx, m := range readMessages { + transformerResults[idx].readMessage = m + transformerResults[idx].writeMessages = []*isb.WriteMessage{{Message: m.Message}} + } } - // let the go routines know that there is no more work - close(transformerCh) - // wait till the processing is done. this will not be an infinite wait because the transformer processing will exit if - // context.Done() is closed. - wg.Wait() - df.opts.logger.Debugw("concurrent applyTransformer completed", zap.Int("concurrency", df.opts.transformerConcurrency), zap.Duration("took", time.Since(concurrentTransformerProcessingStart))) - metrics.SourceTransformerConcurrentProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Observe(float64(time.Since(concurrentTransformerProcessingStart).Microseconds())) - // transformer processing is done. // publish source watermark and assign IsLate attribute based on new event time. var writeMessages []*isb.WriteMessage diff --git a/pkg/sources/generator/tickgen_test.go b/pkg/sources/generator/tickgen_test.go index 8beb774a60..fbacc9def2 100644 --- a/pkg/sources/generator/tickgen_test.go +++ b/pkg/sources/generator/tickgen_test.go @@ -28,7 +28,6 @@ import ( "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" - "github.com/numaproj/numaflow/pkg/sources/forward/applier" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/store" "github.com/numaproj/numaflow/pkg/watermark/wmb" @@ -78,7 +77,7 @@ func TestRead(t *testing.T) { } idleManager, _ := wmb.NewIdleManager(1, len(toBuffers)) - mgen, err := NewMemGen(m, toBuffers, myForwardToAllTest{}, applier.Terminal, fetchWatermark, toVertexWmStores, publishWMStore, idleManager) + mgen, err := NewMemGen(m, toBuffers, myForwardToAllTest{}, nil, fetchWatermark, toVertexWmStores, publishWMStore, idleManager) assert.NoError(t, err) _ = mgen.Start() @@ -139,7 +138,7 @@ func TestStop(t *testing.T) { } idleManager, _ := wmb.NewIdleManager(1, len(toBuffers)) - mgen, err := NewMemGen(m, toBuffers, myForwardToAllTest{}, applier.Terminal, fetchWatermark, toVertexWmStores, publishWMStore, idleManager) + mgen, err := NewMemGen(m, toBuffers, myForwardToAllTest{}, nil, fetchWatermark, toVertexWmStores, publishWMStore, idleManager) assert.NoError(t, err) stop := mgen.Start() @@ -223,7 +222,7 @@ func TestWatermark(t *testing.T) { } publishWMStore, _ := store.BuildNoOpWatermarkStore() - mgen, err := NewMemGen(m, toBuffers, myForwardToAllTest{}, applier.Terminal, nil, nil, publishWMStore, nil) + mgen, err := NewMemGen(m, toBuffers, myForwardToAllTest{}, nil, nil, nil, publishWMStore, nil) assert.NoError(t, err) stop := mgen.Start() diff --git a/pkg/sources/http/http_test.go b/pkg/sources/http/http_test.go index c936d167c9..1b1d69b81e 100644 --- a/pkg/sources/http/http_test.go +++ b/pkg/sources/http/http_test.go @@ -26,7 +26,6 @@ import ( "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" - "github.com/numaproj/numaflow/pkg/sources/forward/applier" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/store" "github.com/numaproj/numaflow/pkg/watermark/wmb" @@ -87,7 +86,7 @@ func Test_NewHTTP(t *testing.T) { } idleManager, _ := wmb.NewIdleManager(1, len(toBuffers)) - h, err := New(vi, toBuffers, myForwardToAllTest{}, applier.Terminal, fetchWatermark, toVertexWmStores, publishWMStores, idleManager) + h, err := New(vi, toBuffers, myForwardToAllTest{}, nil, fetchWatermark, toVertexWmStores, publishWMStores, idleManager) assert.NoError(t, err) assert.False(t, h.(*httpSource).ready) assert.Equal(t, v.Spec.Name, h.GetName()) diff --git a/pkg/sources/kafka/handler_test.go b/pkg/sources/kafka/handler_test.go index 708fc91c78..0c01ddd692 100644 --- a/pkg/sources/kafka/handler_test.go +++ b/pkg/sources/kafka/handler_test.go @@ -30,7 +30,6 @@ import ( "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/shared/logging" - "github.com/numaproj/numaflow/pkg/sources/forward/applier" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/store" "github.com/numaproj/numaflow/pkg/watermark/wmb" @@ -83,7 +82,7 @@ func TestMessageHandling(t *testing.T) { } idleManager, _ := wmb.NewIdleManager(1, len(toBuffers)) - ks, _ := NewKafkaSource(vi, toBuffers, myForwardToAllTest{}, applier.Terminal, fetchWatermark, toVertexWmStores, publishWMStore, idleManager, WithLogger(logging.NewLogger()), + ks, _ := NewKafkaSource(vi, toBuffers, myForwardToAllTest{}, nil, fetchWatermark, toVertexWmStores, publishWMStore, idleManager, WithLogger(logging.NewLogger()), WithBufferSize(100), WithReadTimeOut(100*time.Millisecond)) msg := &sarama.ConsumerMessage{ diff --git a/pkg/sources/kafka/reader_test.go b/pkg/sources/kafka/reader_test.go index 15d844e250..4a99130863 100644 --- a/pkg/sources/kafka/reader_test.go +++ b/pkg/sources/kafka/reader_test.go @@ -26,7 +26,6 @@ import ( "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" "github.com/numaproj/numaflow/pkg/shared/logging" - "github.com/numaproj/numaflow/pkg/sources/forward/applier" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/store" "github.com/numaproj/numaflow/pkg/watermark/wmb" @@ -61,7 +60,7 @@ func TestNewKafkasource(t *testing.T) { } idleManager, _ := wmb.NewIdleManager(1, len(toBuffers)) - ks, err := NewKafkaSource(vi, toBuffers, myForwardToAllTest{}, applier.Terminal, fetchWatermark, toVertexWmStores, publishWMStore, idleManager, WithLogger(logging.NewLogger()), WithBufferSize(100), WithReadTimeOut(100*time.Millisecond), WithGroupName("default")) + ks, err := NewKafkaSource(vi, toBuffers, myForwardToAllTest{}, nil, fetchWatermark, toVertexWmStores, publishWMStore, idleManager, WithLogger(logging.NewLogger()), WithBufferSize(100), WithReadTimeOut(100*time.Millisecond), WithGroupName("default")) // no errors if everything is good. assert.Nil(t, err) @@ -106,7 +105,7 @@ func TestGroupNameOverride(t *testing.T) { } idleManager, _ := wmb.NewIdleManager(1, len(toBuffers)) - ks, _ := NewKafkaSource(vi, toBuffers, myForwardToAllTest{}, applier.Terminal, fetchWatermark, toVertexWmStores, publishWMStore, idleManager, WithLogger(logging.NewLogger()), WithBufferSize(100), WithReadTimeOut(100*time.Millisecond), WithGroupName("default")) + ks, _ := NewKafkaSource(vi, toBuffers, myForwardToAllTest{}, nil, fetchWatermark, toVertexWmStores, publishWMStore, idleManager, WithLogger(logging.NewLogger()), WithBufferSize(100), WithReadTimeOut(100*time.Millisecond), WithGroupName("default")) assert.Equal(t, "default", ks.(*kafkaSource).groupName) @@ -141,7 +140,7 @@ func TestDefaultBufferSize(t *testing.T) { } idleManager, _ := wmb.NewIdleManager(1, len(toBuffers)) - ks, _ := NewKafkaSource(vi, toBuffers, myForwardToAllTest{}, applier.Terminal, fetchWatermark, toVertexWmStores, publishWMStore, idleManager, WithLogger(logging.NewLogger()), WithReadTimeOut(100*time.Millisecond), WithGroupName("default")) + ks, _ := NewKafkaSource(vi, toBuffers, myForwardToAllTest{}, nil, fetchWatermark, toVertexWmStores, publishWMStore, idleManager, WithLogger(logging.NewLogger()), WithReadTimeOut(100*time.Millisecond), WithGroupName("default")) assert.Equal(t, 100, ks.(*kafkaSource).handlerBuffer) @@ -176,7 +175,7 @@ func TestBufferSizeOverrides(t *testing.T) { } idleManager, _ := wmb.NewIdleManager(1, len(toBuffers)) - ks, _ := NewKafkaSource(vi, toBuffers, myForwardToAllTest{}, applier.Terminal, fetchWatermark, toVertexWmStores, publishWMStore, idleManager, WithLogger(logging.NewLogger()), WithBufferSize(110), WithReadTimeOut(100*time.Millisecond), WithGroupName("default")) + ks, _ := NewKafkaSource(vi, toBuffers, myForwardToAllTest{}, nil, fetchWatermark, toVertexWmStores, publishWMStore, idleManager, WithLogger(logging.NewLogger()), WithBufferSize(110), WithReadTimeOut(100*time.Millisecond), WithGroupName("default")) assert.Equal(t, 110, ks.(*kafkaSource).handlerBuffer) diff --git a/pkg/sources/nats/nats_test.go b/pkg/sources/nats/nats_test.go index c557682be9..d5b45528a4 100644 --- a/pkg/sources/nats/nats_test.go +++ b/pkg/sources/nats/nats_test.go @@ -30,7 +30,6 @@ import ( "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer" natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test" - "github.com/numaproj/numaflow/pkg/sources/forward/applier" "github.com/numaproj/numaflow/pkg/sources/sourcer" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/store" @@ -85,7 +84,7 @@ func newInstance(t *testing.T, vi *dfv1.VertexInstance) (sourcer.Sourcer, error) } idleManager, _ := wmb.NewIdleManager(1, len(toBuffers)) - return New(vi, toBuffers, myForwardToAllTest{}, applier.Terminal, fetchWatermark, toVertexWmStores, publishWMStores, idleManager, WithReadTimeout(1*time.Second)) + return New(vi, toBuffers, myForwardToAllTest{}, nil, fetchWatermark, toVertexWmStores, publishWMStores, idleManager, WithReadTimeout(1*time.Second)) } func Test_Single(t *testing.T) { diff --git a/pkg/sources/source.go b/pkg/sources/source.go index 17fc3b14c5..87c6426a68 100644 --- a/pkg/sources/source.go +++ b/pkg/sources/source.go @@ -142,7 +142,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { } // created watermark related components only if watermark is enabled - // otherwise no op will used + // otherwise no op will be used if !sp.VertexInstance.Vertex.Spec.Watermark.Disabled { // build watermark stores for from vertex sourceWmStores, err = jetstream.BuildFromVertexWatermarkStores(ctx, sp.VertexInstance, natsClientPool.NextAvailableClient()) @@ -244,7 +244,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { readyCheckers = append(readyCheckers, transformerGRPCClient) source, err = sp.getSourcer(writersMap, sp.getTransformerGoWhereDecider(shuffleFuncMap), transformerGRPCClient, udsGRPCClient, fetchWatermark, toVertexWatermarkStores, sourcePublisherStores, idleManager, log) } else { - source, err = sp.getSourcer(writersMap, sp.getSourceGoWhereDecider(shuffleFuncMap), applier.Terminal, udsGRPCClient, fetchWatermark, toVertexWatermarkStores, sourcePublisherStores, idleManager, log) + source, err = sp.getSourcer(writersMap, sp.getSourceGoWhereDecider(shuffleFuncMap), nil, udsGRPCClient, fetchWatermark, toVertexWatermarkStores, sourcePublisherStores, idleManager, log) } if err != nil { return fmt.Errorf("failed to find a source, error: %w", err) From 78ab8044ce43e1eb3a90c333f99ff54248071e18 Mon Sep 17 00:00:00 2001 From: a3hadi Date: Wed, 1 May 2024 14:00:36 -0400 Subject: [PATCH 2/9] add metric to no UDTransformer case Signed-off-by: a3hadi --- pkg/sources/forward/data_forward.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index a066a5677f..986c9a34ca 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -312,6 +312,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { } else { // if no user-defined transformer exists, then the messages to write will be identical to the messages read from source for idx, m := range readMessages { + // emit message size metric + metrics.ReadBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(m.Payload))) transformerResults[idx].readMessage = m transformerResults[idx].writeMessages = []*isb.WriteMessage{{Message: m.Message}} } From 350102a17f0e166e2ebeb5da2f925abba1925fd2 Mon Sep 17 00:00:00 2001 From: a3hadi Date: Wed, 1 May 2024 14:33:42 -0400 Subject: [PATCH 3/9] modularize the two for loops Signed-off-by: a3hadi --- pkg/sources/forward/data_forward.go | 44 ++++++++++++++++++----------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index 986c9a34ca..81ed556e89 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -192,6 +192,31 @@ type readWriteMessagePair struct { transformerError error } +func processReadMessages(hasUDTransformer bool, readMessages []*isb.ReadMessage, processorWM wmb.Watermark, + transformerResults []readWriteMessagePair, df *DataForward, transformerCh chan *readWriteMessagePair) { + for idx, m := range readMessages { + // emit message size metric + metrics.ReadBytesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(m.Payload))) + // assign watermark to the message + m.Watermark = time.Time(processorWM) + transformerResults[idx].readMessage = m + + if hasUDTransformer { + // send transformer processing work to the channel + transformerCh <- &transformerResults[idx] + } else { + // if no user-defined transformer exists, then the messages to write will be identical to the messages read from source + transformerCh <- &transformerResults[idx] + } + } +} + // forwardAChunk forwards a chunk of message from the reader to the toBuffers. It does the Read -> Process -> Forward -> Ack chain // for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack // the message after forwarding, barring any platform errors. The platform errors include buffer-full, @@ -290,17 +315,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { }() } concurrentTransformerProcessingStart := time.Now() + processReadMessages(true, readMessages, processorWM, transformerResults, df, transformerCh) - // send to transformer only the data messages - for idx, m := range readMessages { - // emit message size metric - metrics.ReadBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(m.Payload))) - // assign watermark to the message - m.Watermark = time.Time(processorWM) - // send transformer processing work to the channel - transformerResults[idx].readMessage = m - transformerCh <- &transformerResults[idx] - } // let the go routines know that there is no more work close(transformerCh) // wait till the processing is done. this will not be an infinite wait because the transformer processing will exit if @@ -310,13 +326,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { metrics.SourceTransformerConcurrentProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Observe(float64(time.Since(concurrentTransformerProcessingStart).Microseconds())) // transformer processing is done. } else { - // if no user-defined transformer exists, then the messages to write will be identical to the messages read from source - for idx, m := range readMessages { - // emit message size metric - metrics.ReadBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(m.Payload))) - transformerResults[idx].readMessage = m - transformerResults[idx].writeMessages = []*isb.WriteMessage{{Message: m.Message}} - } + processReadMessages(false, readMessages, processorWM, transformerResults, df, nil) } // publish source watermark and assign IsLate attribute based on new event time. From 7f7e2141bb3e9c1cf9ba4a34227b9bf19f407633 Mon Sep 17 00:00:00 2001 From: a3hadi Date: Wed, 1 May 2024 14:35:45 -0400 Subject: [PATCH 4/9] fix else condition Signed-off-by: a3hadi --- pkg/sources/forward/data_forward.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index 81ed556e89..939d1bedd7 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -212,7 +212,7 @@ func processReadMessages(hasUDTransformer bool, readMessages []*isb.ReadMessage, transformerCh <- &transformerResults[idx] } else { // if no user-defined transformer exists, then the messages to write will be identical to the messages read from source - transformerCh <- &transformerResults[idx] + transformerResults[idx].writeMessages = []*isb.WriteMessage{{Message: m.Message}} } } } From 00fb6463827b55ae272faf2c950af96abecd1290 Mon Sep 17 00:00:00 2001 From: a3hadi Date: Wed, 1 May 2024 21:01:33 -0400 Subject: [PATCH 5/9] refactor Signed-off-by: a3hadi --- pkg/sources/forward/data_forward.go | 34 +++++++++++++++-------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index 939d1bedd7..a90b90cf33 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -192,8 +192,13 @@ type readWriteMessagePair struct { transformerError error } +// processReadMessages takes the messages read from source and processes them one of two ways: +// 1. If a user-defined transformer exists then it will send the messages to the transformer channel. Thus, the results of +// the transformer application on a read message will be stored as the corresponding writeMessage in readWriteMessagePairs +// 2. If a user-defined transformer does not exist, then there is no transformer application to take place, and so +// the unmodified read message will be stored as the corresponding writeMessage in readWriteMessagePairs func processReadMessages(hasUDTransformer bool, readMessages []*isb.ReadMessage, processorWM wmb.Watermark, - transformerResults []readWriteMessagePair, df *DataForward, transformerCh chan *readWriteMessagePair) { + readWriteMessagePairs []readWriteMessagePair, df *DataForward, transformerCh chan *readWriteMessagePair) { for idx, m := range readMessages { // emit message size metric metrics.ReadBytesCount.With(map[string]string{ @@ -205,14 +210,14 @@ func processReadMessages(hasUDTransformer bool, readMessages []*isb.ReadMessage, }).Add(float64(len(m.Payload))) // assign watermark to the message m.Watermark = time.Time(processorWM) - transformerResults[idx].readMessage = m + readWriteMessagePairs[idx].readMessage = m if hasUDTransformer { // send transformer processing work to the channel - transformerCh <- &transformerResults[idx] + transformerCh <- &readWriteMessagePairs[idx] } else { - // if no user-defined transformer exists, then the messages to write will be identical to the messages read from source - transformerResults[idx].writeMessages = []*isb.WriteMessage{{Message: m.Message}} + // if no user-defined transformer exists, then the messages to write will be identical to the message read from source + readWriteMessagePairs[idx].writeMessages = []*isb.WriteMessage{{Message: m.Message}} } } } @@ -295,16 +300,13 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { messageToStep[toVertex] = make([][]isb.Message, len(df.toBuffers[toVertex])) } - // user-defined transformer concurrent processing request channel - transformerCh := make(chan *readWriteMessagePair) - // transformerResults stores the results after user-defined transformer processing for all read messages. It indexes - // a read message to the corresponding write message - transformerResults := make([]readWriteMessagePair, len(readMessages)) - // applyTransformer, if there is an Internal error, it is a blocking call and - // will return only if shutdown has been initiated. + readWriteMessagePairs := make([]readWriteMessagePair, len(readMessages)) // If a user defined transformer exists, apply it if df.transformer != nil { + // user-defined transformer concurrent processing request channel + transformerCh := make(chan *readWriteMessagePair) + // create a pool of Transformer Processors var wg sync.WaitGroup for i := 0; i < df.opts.transformerConcurrency; i++ { @@ -315,7 +317,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { }() } concurrentTransformerProcessingStart := time.Now() - processReadMessages(true, readMessages, processorWM, transformerResults, df, transformerCh) + processReadMessages(true, readMessages, processorWM, readWriteMessagePairs, df, transformerCh) // let the go routines know that there is no more work close(transformerCh) @@ -326,7 +328,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { metrics.SourceTransformerConcurrentProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Observe(float64(time.Since(concurrentTransformerProcessingStart).Microseconds())) // transformer processing is done. } else { - processReadMessages(false, readMessages, processorWM, transformerResults, df, nil) + processReadMessages(false, readMessages, processorWM, readWriteMessagePairs, df, nil) } // publish source watermark and assign IsLate attribute based on new event time. @@ -334,7 +336,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { var transformedReadMessages []*isb.ReadMessage latestEtMap := make(map[int32]int64) - for _, m := range transformerResults { + for _, m := range readWriteMessagePairs { writeMessages = append(writeMessages, m.writeMessages...) for _, message := range m.writeMessages { message.Headers = m.readMessage.Headers @@ -365,7 +367,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { var sourcePartitionsIndices = make(map[int32]bool) // let's figure out which vertex to send the results to. // update the toBuffer(s) with writeMessages. - for _, m := range transformerResults { + for _, m := range readWriteMessagePairs { // Look for errors in transformer processing if we see even 1 error we return. // Handling partial retrying is not worth ATM. if m.transformerError != nil { From 1cea4f95127a2509239fc71690864dafdc980d1a Mon Sep 17 00:00:00 2001 From: a3hadi Date: Wed, 1 May 2024 21:19:13 -0400 Subject: [PATCH 6/9] make code more readable Signed-off-by: a3hadi --- pkg/sources/forward/data_forward.go | 178 ++++++++++++++++++++++++---- 1 file changed, 153 insertions(+), 25 deletions(-) diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index a90b90cf33..07efbf2007 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -164,7 +164,10 @@ func (df *DataForward) Start() <-chan struct{} { for _, buffer := range df.toBuffers { for _, partition := range buffer { if err := partition.Close(); err != nil { - log.Errorw("Failed to close partition writer, shutdown anyways...", zap.Error(err), zap.String("bufferTo", partition.GetName())) + log.Errorw("Failed to close partition writer, shutdown anyways...", + zap.Error(err), + zap.String("bufferTo", partition.GetName()), + ) } else { log.Infow("Closed partition writer", zap.String("bufferTo", partition.GetName())) } @@ -208,6 +211,7 @@ func processReadMessages(hasUDTransformer bool, readMessages []*isb.ReadMessage, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName(), }).Add(float64(len(m.Payload))) + // assign watermark to the message m.Watermark = time.Time(processorWM) readWriteMessagePairs[idx].readMessage = m @@ -234,7 +238,13 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { readMessages, err := df.reader.Read(ctx, df.opts.readBatchSize) if err != nil { df.opts.logger.Warnw("failed to read from source", zap.Error(err)) - metrics.ReadMessagesError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Inc() + metrics.ReadMessagesError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Inc() } // if there are no read messages, we return early. @@ -266,7 +276,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { publisher = df.createToVertexWatermarkPublisher(toVertexName, sp) vertexPublishers[sp] = publisher } - idlehandler.PublishIdleWatermark(ctx, wmb.PARTITION_0, df.toBuffers[toVertexName][index], publisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSource, df.vertexReplica, fetchedWm) + idlehandler.PublishIdleWatermark(ctx, wmb.PARTITION_0, df.toBuffers[toVertexName][index], publisher, + df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSource, df.vertexReplica, fetchedWm) } } } @@ -278,9 +289,21 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // reset the idle handler because we have read messages df.srcIdleHandler.Reset() - - metrics.ReadDataMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(readMessages))) - metrics.ReadMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(readMessages))) + metrics.ReadDataMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName()}, + ).Add(float64(len(readMessages))) + + metrics.ReadMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(readMessages))) // store the offsets of the messages we read from source var readOffsets = make([]isb.Offset, len(readMessages)) @@ -324,8 +347,17 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // wait till the processing is done. this will not be an infinite wait because the transformer processing will exit if // context.Done() is closed. wg.Wait() - df.opts.logger.Debugw("concurrent applyTransformer completed", zap.Int("concurrency", df.opts.transformerConcurrency), zap.Duration("took", time.Since(concurrentTransformerProcessingStart))) - metrics.SourceTransformerConcurrentProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Observe(float64(time.Since(concurrentTransformerProcessingStart).Microseconds())) + df.opts.logger.Debugw("concurrent applyTransformer completed", + zap.Int("concurrency", df.opts.transformerConcurrency), + zap.Duration("took", time.Since(concurrentTransformerProcessingStart)), + ) + + metrics.SourceTransformerConcurrentProcessingTime.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Observe(float64(time.Since(concurrentTransformerProcessingStart).Microseconds())) // transformer processing is done. } else { processReadMessages(false, readMessages, processorWM, readWriteMessagePairs, df, nil) @@ -371,7 +403,13 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // Look for errors in transformer processing if we see even 1 error we return. // Handling partial retrying is not worth ATM. if m.transformerError != nil { - metrics.SourceTransformerError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Inc() + metrics.SourceTransformerError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Inc() + df.opts.logger.Errorw("failed to apply source transformer", zap.Error(m.transformerError)) return } @@ -440,7 +478,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { publisher = df.createToVertexWatermarkPublisher(toVertexName, sp) vertexPublishers[sp] = publisher } - idlehandler.PublishIdleWatermark(ctx, wmb.PARTITION_0, df.toBuffers[toVertexName][index], publisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSource, df.vertexReplica, processorWM) + idlehandler.PublishIdleWatermark(ctx, wmb.PARTITION_0, df.toBuffers[toVertexName][index], publisher, + df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSource, df.vertexReplica, processorWM) } } } @@ -453,13 +492,31 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // implicit return for posterity :-) if err != nil { df.opts.logger.Errorw("failed to ack from source", zap.Error(err)) - metrics.AckMessageError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(readOffsets))) + metrics.AckMessageError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(readOffsets))) + return } - metrics.AckMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(readOffsets))) + metrics.AckMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(readOffsets))) // ProcessingTimes of the entire forwardAChunk - metrics.ForwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Observe(float64(time.Since(start).Microseconds())) + metrics.ForwardAChunkProcessingTime.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + }).Observe(float64(time.Since(start).Microseconds())) } func (df *DataForward) ackFromSource(ctx context.Context, offsets []isb.Offset) error { @@ -531,15 +588,32 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. metrics.LabelReason: err.Error(), }).Add(float64(len(msg.Payload))) - df.opts.logger.Infow("Dropped message", zap.String("reason", err.Error()), zap.String("partition", toBufferPartition.GetName()), zap.String("vertex", df.vertexName), zap.String("pipeline", df.pipelineName)) + df.opts.logger.Infow("Dropped message", + zap.String("reason", err.Error()), + zap.String("partition", toBufferPartition.GetName()), + zap.String("vertex", df.vertexName), zap.String("pipeline", df.pipelineName), + ) } else { needRetry = true // we retry only failed messages failedMessages = append(failedMessages, msg) - metrics.WriteMessagesError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Inc() + metrics.WriteMessagesError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + }).Inc() + // a shutdown can break the blocking loop caused due to InternalErr if ok, _ := df.IsShuttingDown(); ok { - metrics.PlatformError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Inc() + metrics.PlatformError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + }).Inc() + return writeOffsets, fmt.Errorf("writeToBuffer failed, Stop called while stuck on an internal error with failed messages:%d, %v", len(failedMessages), errs) } } @@ -569,8 +643,22 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. } } - metrics.WriteMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(writeCount)) - metrics.WriteBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(writeBytes) + metrics.WriteMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + }).Add(float64(writeCount)) + + metrics.WriteBytesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + }).Add(writeBytes) + return writeOffsets, nil } @@ -578,12 +666,29 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. func (df *DataForward) concurrentApplyTransformer(ctx context.Context, readMessagePair <-chan *readWriteMessagePair) { for message := range readMessagePair { start := time.Now() - metrics.SourceTransformerReadMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Inc() + metrics.SourceTransformerReadMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Inc() + writeMessages, err := df.applyTransformer(ctx, message.readMessage) - metrics.SourceTransformerWriteMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Add(float64(len(writeMessages))) + metrics.SourceTransformerWriteMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(writeMessages))) + message.writeMessages = append(message.writeMessages, writeMessages...) message.transformerError = err - metrics.SourceTransformerProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName()}).Observe(float64(time.Since(start).Microseconds())) + metrics.SourceTransformerProcessingTime.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Observe(float64(time.Since(start).Microseconds())) } } @@ -602,7 +707,13 @@ func (df *DataForward) applyTransformer(ctx context.Context, readMessage *isb.Re // this does not mean we should prohibit this from a shutdown. if ok, _ := df.IsShuttingDown(); ok { df.opts.logger.Errorw("Transformer.Apply, Stop called while stuck on an internal error", zap.Error(err)) - metrics.PlatformError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Inc() + metrics.PlatformError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + }).Inc() + return nil, err } continue @@ -620,11 +731,23 @@ func (df *DataForward) whereToStep(writeMessage *isb.WriteMessage, messageToStep // call WhereTo and drop it on errors to, err := df.toWhichStepDecider.WhereTo(writeMessage.Keys, writeMessage.Tags, writeMessage.ID) if err != nil { - df.opts.logger.Errorw("failed in whereToStep", zap.Error(isb.MessageWriteErr{Name: df.reader.GetName(), Header: readMessage.Header, Body: readMessage.Body, Message: fmt.Sprintf("WhereTo failed, %s", err)})) + df.opts.logger.Errorw("failed in whereToStep", zap.Error(isb.MessageWriteErr{ + Name: df.reader.GetName(), + Header: readMessage.Header, + Body: readMessage.Body, + Message: fmt.Sprintf("WhereTo failed, %s", err), + })) + // a shutdown can break the blocking loop caused due to InternalErr if ok, _ := df.IsShuttingDown(); ok { err := fmt.Errorf("whereToStep, Stop called while stuck on an internal error, %v", err) - metrics.PlatformError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Inc() + metrics.PlatformError.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + }).Inc() + return err } return err @@ -632,7 +755,12 @@ func (df *DataForward) whereToStep(writeMessage *isb.WriteMessage, messageToStep for _, t := range to { if _, ok := messageToStep[t.ToVertexName]; !ok { - df.opts.logger.Errorw("failed in whereToStep", zap.Error(isb.MessageWriteErr{Name: df.reader.GetName(), Header: readMessage.Header, Body: readMessage.Body, Message: fmt.Sprintf("no such destination (%s)", t.ToVertexName)})) + df.opts.logger.Errorw("failed in whereToStep", zap.Error(isb.MessageWriteErr{ + Name: df.reader.GetName(), + Header: readMessage.Header, + Body: readMessage.Body, + Message: fmt.Sprintf("no such destination (%s)", t.ToVertexName), + })) } messageToStep[t.ToVertexName][t.ToVertexPartitionIdx] = append(messageToStep[t.ToVertexName][t.ToVertexPartitionIdx], writeMessage.Message) } From 5b271b5ca2a043aa09961b95284b415395113235 Mon Sep 17 00:00:00 2001 From: a3hadi Date: Thu, 2 May 2024 21:12:47 -0400 Subject: [PATCH 7/9] refactor Signed-off-by: a3hadi --- pkg/sources/forward/data_forward.go | 62 ++++++++++++++--------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index 07efbf2007..64a80eff10 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -195,35 +195,15 @@ type readWriteMessagePair struct { transformerError error } -// processReadMessages takes the messages read from source and processes them one of two ways: -// 1. If a user-defined transformer exists then it will send the messages to the transformer channel. Thus, the results of -// the transformer application on a read message will be stored as the corresponding writeMessage in readWriteMessagePairs -// 2. If a user-defined transformer does not exist, then there is no transformer application to take place, and so -// the unmodified read message will be stored as the corresponding writeMessage in readWriteMessagePairs -func processReadMessages(hasUDTransformer bool, readMessages []*isb.ReadMessage, processorWM wmb.Watermark, - readWriteMessagePairs []readWriteMessagePair, df *DataForward, transformerCh chan *readWriteMessagePair) { - for idx, m := range readMessages { - // emit message size metric - metrics.ReadBytesCount.With(map[string]string{ - metrics.LabelVertex: df.vertexName, - metrics.LabelPipeline: df.pipelineName, - metrics.LabelVertexType: string(dfv1.VertexTypeSource), - metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), - metrics.LabelPartitionName: df.reader.GetName(), - }).Add(float64(len(m.Payload))) - - // assign watermark to the message - m.Watermark = time.Time(processorWM) - readWriteMessagePairs[idx].readMessage = m - - if hasUDTransformer { - // send transformer processing work to the channel - transformerCh <- &readWriteMessagePairs[idx] - } else { - // if no user-defined transformer exists, then the messages to write will be identical to the message read from source - readWriteMessagePairs[idx].writeMessages = []*isb.WriteMessage{{Message: m.Message}} - } - } +// emitMessageSizeMetric emits the message size metric +func emitMessageSizeMetric(df *DataForward, m *isb.ReadMessage) { + metrics.ReadBytesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: df.reader.GetName(), + }).Add(float64(len(m.Payload))) } // forwardAChunk forwards a chunk of message from the reader to the toBuffers. It does the Read -> Process -> Forward -> Ack chain @@ -325,7 +305,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { readWriteMessagePairs := make([]readWriteMessagePair, len(readMessages)) - // If a user defined transformer exists, apply it + // If a user-defined transformer exists then it will send the messages to the transformer channel. Thus, the results of + // the transformer application on a read message will be stored as the corresponding writeMessage in readWriteMessagePairs if df.transformer != nil { // user-defined transformer concurrent processing request channel transformerCh := make(chan *readWriteMessagePair) @@ -340,7 +321,14 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { }() } concurrentTransformerProcessingStart := time.Now() - processReadMessages(true, readMessages, processorWM, readWriteMessagePairs, df, transformerCh) + for idx, m := range readMessages { + emitMessageSizeMetric(df, m) + // assign watermark to the message + m.Watermark = time.Time(processorWM) + readWriteMessagePairs[idx].readMessage = m + // send transformer processing work to the channel + transformerCh <- &readWriteMessagePairs[idx] + } // let the go routines know that there is no more work close(transformerCh) @@ -358,9 +346,17 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName(), }).Observe(float64(time.Since(concurrentTransformerProcessingStart).Microseconds())) - // transformer processing is done. + // If a user-defined transformer does not exist, then there is no transformer application to take place, and so + // the unmodified read message will be stored as the corresponding writeMessage in readWriteMessagePairs } else { - processReadMessages(false, readMessages, processorWM, readWriteMessagePairs, df, nil) + for idx, m := range readMessages { + emitMessageSizeMetric(df, m) + // assign watermark to the message + m.Watermark = time.Time(processorWM) + readWriteMessagePairs[idx].readMessage = m + // if no user-defined transformer exists, then the messages to write will be identical to the message read from source + readWriteMessagePairs[idx].writeMessages = []*isb.WriteMessage{{Message: m.Message}} + } } // publish source watermark and assign IsLate attribute based on new event time. From d255f919fc84c24c95c27b72a438de5741b6a0cb Mon Sep 17 00:00:00 2001 From: a3hadi Date: Thu, 2 May 2024 21:19:07 -0400 Subject: [PATCH 8/9] update comments Signed-off-by: a3hadi --- pkg/sources/forward/data_forward.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index 64a80eff10..399b44e733 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -305,8 +305,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { readWriteMessagePairs := make([]readWriteMessagePair, len(readMessages)) - // If a user-defined transformer exists then it will send the messages to the transformer channel. Thus, the results of - // the transformer application on a read message will be stored as the corresponding writeMessage in readWriteMessagePairs + // If a user-defined transformer exists, apply it if df.transformer != nil { // user-defined transformer concurrent processing request channel transformerCh := make(chan *readWriteMessagePair) @@ -326,7 +325,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // assign watermark to the message m.Watermark = time.Time(processorWM) readWriteMessagePairs[idx].readMessage = m - // send transformer processing work to the channel + // send transformer processing work to the channel. Thus, the results of the transformer + // application on a read message will be stored as the corresponding writeMessage in readWriteMessagePairs transformerCh <- &readWriteMessagePairs[idx] } @@ -346,8 +346,6 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName(), }).Observe(float64(time.Since(concurrentTransformerProcessingStart).Microseconds())) - // If a user-defined transformer does not exist, then there is no transformer application to take place, and so - // the unmodified read message will be stored as the corresponding writeMessage in readWriteMessagePairs } else { for idx, m := range readMessages { emitMessageSizeMetric(df, m) @@ -355,6 +353,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { m.Watermark = time.Time(processorWM) readWriteMessagePairs[idx].readMessage = m // if no user-defined transformer exists, then the messages to write will be identical to the message read from source + // thus, the unmodified read message will be stored as the corresponding writeMessage in readWriteMessagePairs readWriteMessagePairs[idx].writeMessages = []*isb.WriteMessage{{Message: m.Message}} } } From ad8df92bca3bd1356d43ffca25ee29163accf1ce Mon Sep 17 00:00:00 2001 From: a3hadi Date: Mon, 6 May 2024 11:39:59 -0400 Subject: [PATCH 9/9] change transformer to nil Signed-off-by: a3hadi --- pkg/sources/source.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/sources/source.go b/pkg/sources/source.go index fdd72c58e0..1bad4b66e9 100644 --- a/pkg/sources/source.go +++ b/pkg/sources/source.go @@ -41,7 +41,6 @@ import ( sharedutil "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/shuffle" sourceforward "github.com/numaproj/numaflow/pkg/sources/forward" - "github.com/numaproj/numaflow/pkg/sources/forward/applier" "github.com/numaproj/numaflow/pkg/sources/generator" "github.com/numaproj/numaflow/pkg/sources/http" "github.com/numaproj/numaflow/pkg/sources/kafka" @@ -271,7 +270,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { if sp.VertexInstance.Vertex.HasUDTransformer() { sourceForwarder, err = sourceforward.NewDataForward(sp.VertexInstance, sourceReader, writersMap, sp.getTransformerGoWhereDecider(shuffleFuncMap), srcTransformerGRPCClient, fetchWatermark, sourceWmPublisher, toVertexWatermarkStores, idleManager, forwardOpts...) } else { - sourceForwarder, err = sourceforward.NewDataForward(sp.VertexInstance, sourceReader, writersMap, sp.getSourceGoWhereDecider(shuffleFuncMap), applier.Terminal, fetchWatermark, sourceWmPublisher, toVertexWatermarkStores, idleManager, forwardOpts...) + sourceForwarder, err = sourceforward.NewDataForward(sp.VertexInstance, sourceReader, writersMap, sp.getSourceGoWhereDecider(shuffleFuncMap), nil, fetchWatermark, sourceWmPublisher, toVertexWatermarkStores, idleManager, forwardOpts...) } if err != nil { return fmt.Errorf("failed to create source forwarder, error: %w", err)